In [43]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, max, year, row_number
import clickhouse_connect


# Extractor from MsSQL
ms_JDBC_config = "mssql-jdbc-12.8.1.jre11.jar"
JDBC_config = f"{ms_JDBC_config}"
ms_url = "127.0.0.1:1433"
database_name = "Mssql-ETL"
ms_user, ms_password = "sa", "1qaz!QAZ"
table_name = "SONY_daily_data"

spark = SparkSession.builder \
    .appName("MSSQL Data Extraction") \
    .config("spark.jars", JDBC_config) \
    .getOrCreate()

jdbc_url = f"jdbc:sqlserver://{ms_url};databaseName={database_name};encrypt=false;trustServerCertificate=true"
connection_properties = {
    "user": ms_user,
    "password": ms_password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

query = f"(SELECT * FROM {table_name}) AS temp"

df = spark.read.jdbc(url=jdbc_url, table=query, properties=connection_properties)
df.cache()

24/10/25 13:47:18 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


DataFrame[Date: string, Open: float, High: float, Low: float, Close: float, AdjClose: float, Volume: int]

In [37]:
# Transform to pandas DataFrame
pd_df = df.toPandas()

In [44]:
# Max High Low difference
print("Max High Low difference")
df.createOrReplaceTempView(f"{table_name}_view")
max_high_low_df = spark.sql(f"""
    SELECT DATE, HIGH - LOW AS HIGH_LOW 
    FROM {table_name}_view 
    WHERE HIGH - LOW = (SELECT MAX(HIGH - LOW) FROM {table_name}_view)
""")
max_high_low_df.show()

Max High Low difference
+----------+--------+
|      DATE|HIGH_LOW|
+----------+--------+
|2000-09-27|  13.375|
+----------+--------+



In [45]:
# DataFrame API Query
df_with_diff = df.withColumn("HIGH_LOW", col("HIGH") - col("LOW"))
max_high_low_value = df_with_diff.agg(max("HIGH_LOW")).collect()[0][0]
max_high_low_df = df_with_diff.filter(col("HIGH_LOW") == max_high_low_value)
max_high_low_df.select("DATE", "HIGH_LOW").show()

+----------+--------+
|      DATE|HIGH_LOW|
+----------+--------+
|2000-09-27|  13.375|
+----------+--------+



In [46]:
# Identify the top 3 trading days (by volume) each year,
# along with the corresponding High, Low, and the percentage change between Open and Close prices for those days.
print("Top trading days in year")
top_days_df = spark.sql(f"""
    WITH DateGroup AS (
        SELECT DATE, HIGH, LOW, VOLUME, (CLOSE - OPEN) / CLOSE * 100 AS PC,
               ROW_NUMBER() OVER (PARTITION BY YEAR(DATE) ORDER BY VOLUME DESC) AS RowNum
        FROM {table_name}_view
    )
    SELECT * 
    FROM DateGroup 
    WHERE RowNum < 4 
    ORDER BY YEAR(DATE), RowNum
""")
top_days_df.show()

Top trading days in year
+----------+---------+---------+-------+-------------------+------+
|      DATE|     HIGH|      LOW| VOLUME|                 PC|RowNum|
+----------+---------+---------+-------+-------------------+------+
|1980-12-18| 7.159091| 6.988636|2198460| 1.6000084863278141|     1|
|1980-12-22| 7.215909| 7.102273|1560460|  1.574798362609363|     2|
|1980-12-19| 7.159091| 7.102273| 914980|                0.0|     3|
|1981-03-24| 8.693182| 8.522727|3331900|                0.0|     1|
|1981-03-10| 8.238636| 7.727273|3039520| 3.4965045635516825|     2|
|1981-04-28| 9.147727| 8.920455|2883100| 1.8749942958354404|     3|
|1982-11-10| 7.840909| 7.613636|5396380| 0.7407508714039628|     1|
|1982-12-21| 6.647727| 6.363636|5269660|-1.7543805405045676|     2|
|1982-11-04| 7.045455| 6.818182|5068580|                0.0|     3|
|1983-03-03| 6.136364| 5.909091|5281760| 3.7037077329447583|     1|
|1983-06-28| 6.534091| 6.306818|4388780|-1.7857239197716805|     2|
|1983-03-04| 6.647727| 

In [47]:
# DataFrame API Query
window_spec = Window.partitionBy(year("DATE")).orderBy(col("VOLUME").desc())
df_with_row_num = df.withColumn("ROW_NUM", row_number().over(window_spec))
top_3_per_year = df_with_row_num.filter(col("ROW_NUM") <= 3)
final_top = top_3_per_year.withColumn("PC", (col("CLOSE") - col("OPEN")) / col("CLOSE") * 100)
final_top = final_top.select("DATE", "HIGH", "LOW", "VOLUME", "PC", "ROW_NUM")
final_top.show()

+----------+---------+---------+-------+-------------------+-------+
|      DATE|     HIGH|      LOW| VOLUME|                 PC|ROW_NUM|
+----------+---------+---------+-------+-------------------+-------+
|1980-12-18| 7.159091| 6.988636|2198460| 1.6000084863278141|      1|
|1980-12-22| 7.215909| 7.102273|1560460|  1.574798362609363|      2|
|1980-12-19| 7.159091| 7.102273| 914980|                0.0|      3|
|1981-03-24| 8.693182| 8.522727|3331900|                0.0|      1|
|1981-03-10| 8.238636| 7.727273|3039520| 3.4965045635516825|      2|
|1981-04-28| 9.147727| 8.920455|2883100| 1.8749942958354404|      3|
|1982-11-10| 7.840909| 7.613636|5396380| 0.7407508714039628|      1|
|1982-12-21| 6.647727| 6.363636|5269660|-1.7543805405045676|      2|
|1982-11-04| 7.045455| 6.818182|5068580|                0.0|      3|
|1983-03-03| 6.136364| 5.909091|5281760| 3.7037077329447583|      1|
|1983-06-28| 6.534091| 6.306818|4388780|-1.7857239197716805|      2|
|1983-03-04| 6.647727| 6.136364|43

In [49]:
# Load DataFrame to ClickHouse with pandas DataFrame
ch_host = "127.0.0.1"
ch_port = 8123
ch_database = "default"
ch_table = "SONY_DF"
client = clickhouse_connect.get_client(host=ch_host, port=ch_port, database=ch_database)

create_table_query = f"""
CREATE TABLE IF NOT EXISTS {ch_table} (
    DATE String,
    HIGH Float64,
    LOW Float64,
    VOLUME Float64,
    PC Float64,
    ROW_NUM UInt64
) ENGINE = MergeTree()
ORDER BY DATE;
"""
client.query(create_table_query)

pandas_df = final_top.toPandas()
client.insert_df(table=ch_table, df=pandas_df)

<clickhouse_connect.driver.summary.QuerySummary at 0x164f90290>