# Test PySpark Connection

This notebook demonstrates how to connect to the Spark master from JupyterLab.

In [1]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("JupyterSpark") \
    .master("spark://spark-master:7077") \
    .getOrCreate()

# Print Spark version
print(f"Spark version: {spark.version}")

Spark version: 3.5.0


In [16]:
import os
files = os.listdir("/home/jovyan/work/data")
print(f"Available files: {files}")

Available files: ['.ipynb_checkpoints', 'idc_btt.csv']


In [20]:
import pandas as pd
pandas_df = pd.read_csv("/home/jovyan/work/data/idc_btt.csv")
df = spark.createDataFrame(pandas_df)

In [4]:
# Check Spark UI URL
print(f"Spark UI: {spark.sparkContext.uiWebUrl}")

Spark UI: http://5826486a572b:4040


In [5]:
df.show()

+---+-----------+----+-------------+-------------------+-----+-------------------+
| id|device_name|unit|     unixtime|               time|value|       create_at_bi|
+---+-----------+----+-------------+-------------------+-----+-------------------+
|  1|  IDC1-DCiE|kW h|1667445664452|2022-11-03 10:21:04|57.86|2022-11-04 00:00:03|
|  2|  IDC1-DCiE|kW h|1667445724324|2022-11-03 10:22:04|58.62|2022-11-04 00:00:03|
|  3|  IDC1-DCiE|kW h|1667445784338|2022-11-03 10:23:04|58.75|2022-11-04 00:00:03|
|  4|  IDC1-DCiE|kW h|1667445844345|2022-11-03 10:24:04| 58.4|2022-11-04 00:00:03|
|  5|  IDC1-DCiE|kW h|1667445904432|2022-11-03 10:25:04|59.04|2022-11-04 00:00:03|
|  6|  IDC1-DCiE|kW h|1667445967053|2022-11-03 10:26:07|58.78|2022-11-04 00:00:03|
|  7|  IDC1-DCiE|kW h|1667446024311|2022-11-03 10:27:04| 57.6|2022-11-04 00:00:03|
|  8|  IDC1-DCiE|kW h|1667446084459|2022-11-03 10:28:04|57.06|2022-11-04 00:00:03|
|  9|  IDC1-DCiE|kW h|1667446144312|2022-11-03 10:29:04|57.33|2022-11-04 00:00:03|
| 10

In [6]:
postgres_url = "jdbc:postgresql://host.docker.internal:5433/mek"
properties = {
    "user": "airflow",
    "password": "airflow",
    "driver": "org.postgresql.Driver"
}

In [7]:
from pyspark.sql.functions import sum, col
df = df.withColumn("value", col("value").cast("double"))

In [8]:
df_sum = df.groupBy("device_name").agg(sum("value").alias("Total_Value"))
df_sum.show()

+--------------------+-------------------+
|         device_name|        Total_Value|
+--------------------+-------------------+
|IDC1-PM-ATS-NORMA...| 4.8228715046625E10|
| IDC2-UPS-2/Total KW|  2108764.799999994|
|           IDC1-DCiE|  2142374.419999999|
| IDC1-UPS-3/Total KW| 2494811.2000000067|
|IDC2-UDB-1/Total kWh| 3.5153927041125E10|
|IDC2-FAC-INET/rou...|  1.801048418025E11|
| IDC2-UPS-1/Total KW| 2228751.8999999976|
|           IDC2-DCiE| 1730658.5999999985|
| IDC2-EMDB/Total kWh|   5.65652402405E11|
|IDC2-PM-ATS-NORMA...|3.59511782104375E10|
|   IDC1-PM AIRDB/kWh|   4.14686837383E11|
| IDC1-UPS-2/Total KW|  2495253.699999997|
| IDC1-UPS-1/Total KW|  2510737.200000004|
|    IDC1-PM EMDB/kWh|  1.137159710324E12|
|            IDC1-PUE|  63485.01999999923|
|IDC2-UDB-2/Total kWh|   3.73490688695E10|
|IDC2-FAC-THAISARN...|  7.161759252775E10|
|            IDC2-PUE|  78720.49999999985|
+--------------------+-------------------+



In [9]:
sqlserver_url = "jdbc:sqlserver://host.docker.internal:1433;databaseName=mek"
properties = {
    "user": "mek",
    "password": "NewStrongPassword",
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
    "trustServerCertificate": "true"
}

In [10]:
df.write.jdbc(url=sqlserver_url, table="idc_btt", mode="overwrite", properties=properties)