In [235]:
# importing some PySpark libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

# importing some python3 libraries
import os
import shutil

In [236]:
# без этой строчки про парсер ничего работать не будет
# (как я понял это проблемы современного спарка)
spark = SparkSession \
        .builder \
        .appName("SparkTask") \
        .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
        .getOrCreate()

input_csv = "fin_sample.csv"
path = input("Enter PATH to CSV file or just press ENTER for std df: ")
if (path):
    input_csv = path

Enter PATH to CSV file or just press ENTER for std df: 


In [237]:
# reading df
schema = StructType([
    StructField("#SYMBOL", StringType(), True),
    StructField("SYSTEM", StringType(), True),
    StructField("MOMENT", StringType(), True),
    StructField("ID_DEAL", LongType(), True),
    StructField("PRICE_DEAL", DoubleType(), True),
    StructField("VOLUME", LongType(), True),
    StructField("OPEN_POS", LongType(), True),
    StructField("DIRECTION", StringType(), True),
])
df = spark.read.csv(input_csv, header=True, schema=schema)
df.show(5)

+-------+------+-----------------+---------+----------+------+--------+---------+
|#SYMBOL|SYSTEM|           MOMENT|  ID_DEAL|PRICE_DEAL|VOLUME|OPEN_POS|DIRECTION|
+-------+------+-----------------+---------+----------+------+--------+---------+
|   RIZ1|     F|20111130190000307|463365797|  154680.0|     1|  949182|        B|
|   RIZ1|     F|20111130190000547|463365798|  155000.0|     1|  949186|        B|
|   RIZ1|     F|20111130190000547|463365799|  155000.0|     1|  949186|        B|
|   RIZ1|     F|20111130190000547|463365800|  155200.0|     1|  949186|        B|
|   RIZ1|     F|20111130190001200|463365801|  155555.0|     1|  949190|        B|
+-------+------+-----------------+---------+----------+------+--------+---------+
only showing top 5 rows



In [238]:
# just setting std options
options = {}
options["candle.width"] = 300000
options["candle.date.from"] = 19000101
options["candle.date.to"] = 20200101
options["candle.time.from"] = 1000
options["candle.time.to"] = 2000

# parsing xml config file from command line argument
xml_file_path = input(
    "Enter PATH to XML config file or just press ENTER for std options: "
)
if (xml_file_path):
    tree = xml.etree.ElementTree.parse(xml_file_path)
    root = tree.getroot()
    for prop in root.findall('property'):
        name = prop.find('name').text
        value = prop.find('value').text
        options[name] = value

Enter PATH to XML config file or just press ENTER for std options: 


In [239]:
# convert to TIMESTAMP format
df = df.withColumn("MOMENT", F.to_timestamp("MOMENT", "yyyyMMddHHmmssSSS"))

# filtering according options
df = df.filter(
    (F.hour("MOMENT") * 100 + F.minute("MOMENT") >= options["candle.time.from"]) &
    (F.hour("MOMENT") * 100 + F.minute("MOMENT") <= options["candle.time.to"])
)
df.show(5)

+-------+------+--------------------+---------+----------+------+--------+---------+
|#SYMBOL|SYSTEM|              MOMENT|  ID_DEAL|PRICE_DEAL|VOLUME|OPEN_POS|DIRECTION|
+-------+------+--------------------+---------+----------+------+--------+---------+
|   RIZ1|     F|2011-11-30 19:00:...|463365797|  154680.0|     1|  949182|        B|
|   RIZ1|     F|2011-11-30 19:00:...|463365798|  155000.0|     1|  949186|        B|
|   RIZ1|     F|2011-11-30 19:00:...|463365799|  155000.0|     1|  949186|        B|
|   RIZ1|     F|2011-11-30 19:00:...|463365800|  155200.0|     1|  949186|        B|
|   RIZ1|     F|2011-11-30 19:00:...|463365801|  155555.0|     1|  949190|        B|
+-------+------+--------------------+---------+----------+------+--------+---------+
only showing top 5 rows



In [240]:
# making candlesticks

# splitting by time windows
window_spec = F.window("MOMENT", f"{options['candle.width']} milliseconds")

# group by #SYMBOL and time windows
# then aggregate it for answer according aliases
candlestick_df = df.groupBy("#SYMBOL", window_spec).agg(
    F.first("PRICE_DEAL").alias("OPEN"),
    F.max("PRICE_DEAL").alias("HIGH"),
    F.min("PRICE_DEAL").alias("LOW"),
    F.last("PRICE_DEAL").alias("CLOSE"),
    F.sum("VOLUME").alias("Volume")
)

# select required columns for answer
candlestick_df = candlestick_df.select(
    F.col("#SYMBOL"),
    F.col("window.start").alias("MOMENT"),
    "OPEN",
    "HIGH",
    "LOW",
    "CLOSE"
)

# order by time (ascending)
candlestick_df = candlestick_df.orderBy("#SYMBOL", "MOMENT")

# converting back to yyyyMMddHHmmssSSS from TIMESTAMP
candlestick_df = candlestick_df.withColumn(
    "MOMENT",
    F.date_format("MOMENT", "yyyyMMddHHmmssSSS")
)

candlestick_df.show(5)

+-------+-----------------+------+------+------+------+
|#SYMBOL|           MOMENT|  OPEN|  HIGH|   LOW| CLOSE|
+-------+-----------------+------+------+------+------+
|   AUZ1|20111130190000000|1.0265|1.0265|1.0265|1.0265|
|   AUZ1|20111130192000000| 1.026| 1.026| 1.026| 1.026|
|   AUZ1|20111130192500000|1.0265|1.0265|1.0265|1.0265|
|   AUZ1|20111130195000000|1.0262|1.0262|1.0262|1.0262|
|   BRZ1|20111130190000000|111.86|111.98|111.67|111.68|
+-------+-----------------+------+------+------+------+
only showing top 5 rows



In [241]:
# writing answer

# selecting unique #SYMBOL
unique_symbols = candlestick_df \
                 .select("#SYMBOL") \
                 .distinct() \
                 .rdd \
                 .flatMap(lambda x: x) \
                 .collect()

output_dir = "AnswerSparkTask"

# rm -r output_dir
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)
os.makedirs(output_dir)

# writing answer to files
for symbol in unique_symbols:
    symbol_df = candlestick_df.filter(F.col("#SYMBOL") == symbol)
    output_path = os.path.join(output_dir, f"{symbol}.csv")
    symbol_df.write.csv(output_path, header=True, mode="overwrite")

    # print some logs
    print(f"Saved candlestick data for {symbol} to {output_path}")

Saved candlestick data for TTZ1 to AnswerSparkTask/TTZ1.csv
Saved candlestick data for SPZ1 to AnswerSparkTask/SPZ1.csv
Saved candlestick data for CUZ1 to AnswerSparkTask/CUZ1.csv
Saved candlestick data for O4H2 to AnswerSparkTask/O4H2.csv
Saved candlestick data for LKM2 to AnswerSparkTask/LKM2.csv
Saved candlestick data for PTZ1 to AnswerSparkTask/PTZ1.csv
Saved candlestick data for RNZ1 to AnswerSparkTask/RNZ1.csv
Saved candlestick data for FSZ1 to AnswerSparkTask/FSZ1.csv
Saved candlestick data for GZZ1 to AnswerSparkTask/GZZ1.csv
Saved candlestick data for SiM2 to AnswerSparkTask/SiM2.csv
Saved candlestick data for VBH2 to AnswerSparkTask/VBH2.csv
Saved candlestick data for SGZ1 to AnswerSparkTask/SGZ1.csv
Saved candlestick data for LKZ1 to AnswerSparkTask/LKZ1.csv
Saved candlestick data for VXZ1 to AnswerSparkTask/VXZ1.csv
Saved candlestick data for SRZ1 to AnswerSparkTask/SRZ1.csv
Saved candlestick data for GDZ1 to AnswerSparkTask/GDZ1.csv
Saved candlestick data for LKH2 to Answe