## Solution A1~: MariaDB connection and ET

In [49]:
# - - - Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, sum, max
from pyspark.sql import functions as F

In [50]:
# - - - Spark session creation - with JDBC driver path configuration

# Application details
appName = "App A1~: MariaDB connection and ET"
master = "local"
jdbc_driver_path = "C:/Spark/spark-3.5.1-bin-hadoop3/jars/mariadb-java-client-3.4.0.jar"
jdbc_driver_path = "E:/Software_Lenovo/Development_Softwares/MariaDB_Spark/Microsoft JDBC Driver for SQL Server/sqljdbc_12.6.3.0_enu/sqljdbc_12.6/enu/jars/mssql-jdbc-12.6.3.jre8.jar"


# Create Spark session
spark_session = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .config("spark.driver.extraClassPath", jdbc_driver_path) \
    .config("spark.driver.extraClassPath", jdbc_driver_path) \
    .getOrCreate()

print("Details of the Spark session:")
print("Spark:", spark_session, "\n", "Spark version:", spark_session.version, "\n", "SparkContext:", spark_session.sparkContext, "\n", "SparkContext version:", spark_session.sparkContext.version)

Details of the Spark session:
Spark: <pyspark.sql.session.SparkSession object at 0x0000017CF3EB72C0> 
 Spark version: 3.5.1 
 SparkContext: <SparkContext master=local appName=App A1~: MariaDB connection and ET> 
 SparkContext version: 3.5.1


In [51]:
# - - - Read data from MariaDB using JDBC - Create DataFrame

# JDBC connection details (Parameters)
server = "localhost"
port = 3306
database = "nyse_db1_bdtt2"
jdbc_url = f"jdbc:mysql://{server}:{port}/{database}?permitMysqlScheme"
db_properties = {
    "user": "root",
    "password": "MMdd1234",
    "driver": "org.mariadb.jdbc.Driver"
}

# SQL query
sql = "select * from stock_prices_nyse"

# Read data from MariaDB using JDBC with properties dictionary
df1_nyse = spark_session.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("query", sql) \
    .option("user", db_properties['user']) \
    .option("password", db_properties['password']) \
    .option("driver", db_properties['driver']) \
    .load()

In [52]:
print("Information about the DataFrame - Schema:", df1_nyse.printSchema())

root
 |-- date: date (nullable = true)
 |-- symbol: string (nullable = true)
 |-- open: decimal(15,10) (nullable = true)
 |-- close: decimal(15,10) (nullable = true)
 |-- low: decimal(15,10) (nullable = true)
 |-- high: decimal(15,10) (nullable = true)
 |-- volume: integer (nullable = true)
 |-- country: string (nullable = true)

Information about the DataFrame - Schema: None


In [53]:
print('Number of rows & columns in dataframe: ', (df1_nyse.count(), len(df1_nyse.columns)))
print("Number of distinct symbols (Companies): ", df1_nyse.select("symbol").distinct().count())

Number of rows & columns in dataframe:  (849125, 8)
Number of distinct symbols (Companies):  498


In [54]:
# Explore the dataframe
print("First 15 rows of the dataframe:\n", df1_nyse.show(15))

+----------+------+--------------+--------------+--------------+--------------+---------+-----------+
|      date|symbol|          open|         close|           low|          high|   volume|    country|
+----------+------+--------------+--------------+--------------+--------------+---------+-----------+
|2010-01-04|     A| 31.3899990000| 31.3000010000| 31.1300000000| 31.6300010000|  3815500|USA\r      |
|2010-01-04|   AAL|  4.8400000000|  4.7700000000|  4.6600000000|  4.9400000000|  9837300|USA\r      |
|2010-01-04|   AAP| 40.7000010000| 40.3800010000| 40.3600010000| 41.0400010000|  1701700|USA\r      |
|2010-01-04|  AAPL|213.4299980000|214.0099980000|212.3800010000|214.4999960000|123432400|USA\r      |
|2010-01-04|   ABC| 26.2900010000| 26.6299990000| 26.1399990000| 26.6900010000|  2455900|USA\r      |
|2010-01-04|   ABT| 54.1899530000| 54.4599510000| 53.9199510000| 54.5599540000| 10829000|USA\r      |
|2010-01-04|   ACN| 41.5200000000| 42.0700000000| 41.5000000000| 42.2000010000|  3

In [55]:
print("country column value for 55th record:", df1_nyse.select("country").collect()[54])

country column value for 55th record: Row(country='USA\r      ')


In [56]:
# Removing the newline character (carriage return) & trimming the whitespace (leading & trailing) from the 'country' column using regexp_replace.
# Using only the trim() removes leading and trailing whitespace characters, not other whitespace characters like newline (\r)

df1_nyse = df1_nyse.withColumn("country", F.regexp_replace(F.trim(F.col("country")), "\r", ""))
print("First 15 rows of the dataframe after removing newline character from 'country' column:\n", df1_nyse.show(15))

+----------+------+--------------+--------------+--------------+--------------+---------+-------+
|      date|symbol|          open|         close|           low|          high|   volume|country|
+----------+------+--------------+--------------+--------------+--------------+---------+-------+
|2010-01-04|     A| 31.3899990000| 31.3000010000| 31.1300000000| 31.6300010000|  3815500|    USA|
|2010-01-04|   AAL|  4.8400000000|  4.7700000000|  4.6600000000|  4.9400000000|  9837300|    USA|
|2010-01-04|   AAP| 40.7000010000| 40.3800010000| 40.3600010000| 41.0400010000|  1701700|    USA|
|2010-01-04|  AAPL|213.4299980000|214.0099980000|212.3800010000|214.4999960000|123432400|    USA|
|2010-01-04|   ABC| 26.2900010000| 26.6299990000| 26.1399990000| 26.6900010000|  2455900|    USA|
|2010-01-04|   ABT| 54.1899530000| 54.4599510000| 53.9199510000| 54.5599540000| 10829000|    USA|
|2010-01-04|   ACN| 41.5200000000| 42.0700000000| 41.5000000000| 42.2000010000|  3650100|    USA|
|2010-01-04|  ADBE| 

In [57]:
print("Last 15 rows of the dataframe after removing newline character from 'country' column:\n", df1_nyse.tail(5))

# print("Last row of the dataframe after removing newline character from 'country' column:", df1_nyse.tail(1)[0].asDict())

Last 15 rows of the dataframe after removing newline character from 'country' column:
 [Row(date=datetime.date(2016, 12, 30), symbol='YHOO', open=Decimal('38.7200010000'), close=Decimal('38.6699980000'), low=Decimal('38.4300000000'), high=Decimal('39.0000000000'), volume=6431600, country='USA'), Row(date=datetime.date(2016, 12, 30), symbol='YUM', open=Decimal('63.9300000000'), close=Decimal('63.3300020000'), low=Decimal('63.1600000000'), high=Decimal('63.9399990000'), volume=1887100, country='USA'), Row(date=datetime.date(2016, 12, 30), symbol='ZBH', open=Decimal('103.3099980000'), close=Decimal('103.1999970000'), low=Decimal('102.8499980000'), high=Decimal('103.9300000000'), volume=973800, country='USA'), Row(date=datetime.date(2016, 12, 30), symbol='ZION', open=Decimal('43.0700000000'), close=Decimal('43.0400010000'), low=Decimal('42.6899990000'), high=Decimal('43.3100010000'), volume=1938100, country='USA'), Row(date=datetime.date(2016, 12, 30), symbol='ZTS', open=Decimal('53.639999

In [58]:
# Total volume traded for each symbol (Company) in descending order
total_volume_desc = df1_nyse.groupBy("symbol").agg(sum("volume").alias("total_volume")).orderBy(F.desc("total_volume"))
print("Total volume traded for each symbol in descending order: ", total_volume_desc.show(30))

+------+------------+
|symbol|total_volume|
+------+------------+
|   BAC|250885842000|
|  AAPL|166025817100|
|     F| 86958719600|
|    GE| 85548026700|
|  MSFT| 80695788200|
|  INTC| 74084178800|
|  CSCO| 73979560500|
|   PFE| 65535429500|
|   HPQ| 63072337400|
|     C| 59393711100|
|    MU| 55338938000|
|  NFLX| 48251073600|
|     T| 46424074200|
|   JPM| 45932705600|
|   WFC| 45282059200|
|  EBAY| 44952852700|
|   FCX| 40659626600|
|    FB| 40327946900|
|  ORCL| 39361272500|
|  YHOO| 35185494200|
|   CHK| 34911289700|
|    RF| 33994155700|
|  FOXA| 30032646800|
|    VZ| 28986522900|
|   XOM| 28754969900|
|    KO| 28050558500|
|  AMAT| 27545989300|
|     V| 26726333500|
|   BSX| 26577688500|
| CMCSA| 26161692500|
+------+------------+
only showing top 30 rows

Total volume traded for each symbol in descending order:  None


In [59]:
date_range = df1_nyse.agg(F.min("date"), F.max("date")).collect()
print("Date range of the date column: ", date_range)

Date range of the date column:  [Row(min(date)=datetime.date(2010, 1, 4), max(date)=datetime.date(2016, 12, 30))]


#### Transform (Preprocess) the Stock data

In [60]:
# Check for null values in the columns
print("Number of null values in each column:")
df1_nyse.select([count(F.when(F.col(c).isNull(), c)).alias(c) for c in df1_nyse.columns]).show()

Number of null values in each column:
+----+------+----+-----+---+----+------+-------+
|date|symbol|open|close|low|high|volume|country|
+----+------+----+-----+---+----+------+-------+
|   0|     0|   0|    0|  0|   0|     0|      0|
+----+------+----+-----+---+----+------+-------+



In [61]:
# Sort the data by symbol and date in ascending order
df1_nyse = df1_nyse.orderBy("symbol", "date")

In [62]:
print("First 30 rows of the dataframe after sorting by symbol and date in ascending order:\n", df1_nyse.show(30))

+----------+------+-------------+-------------+-------------+-------------+--------+-------+
|      date|symbol|         open|        close|          low|         high|  volume|country|
+----------+------+-------------+-------------+-------------+-------------+--------+-------+
|2010-01-04|     A|31.3899990000|31.3000010000|31.1300000000|31.6300010000| 3815500|    USA|
|2010-01-05|     A|31.2099990000|30.9600010000|30.7600000000|31.2200010000| 4186000|    USA|
|2010-01-06|     A|30.8500010000|30.8500010000|30.7600000000|31.0000010000| 3243700|    USA|
|2010-01-07|     A|30.7800010000|30.8099990000|30.5000000000|30.8200010000| 3095100|    USA|
|2010-01-08|     A|30.6400000000|30.8000000000|30.3999990000|30.8500010000| 3733900|    USA|
|2010-01-11|     A|30.8799990000|30.8200010000|30.6700010000|31.0500000000| 4781500|    USA|
|2010-01-12|     A|30.5600010000|30.4500010000|30.2199990000|30.6500000000| 2871000|    USA|
|2010-01-13|     A|30.4699990000|30.6899990000|30.0500010000|30.780001

In [63]:
symbol_count = df1_nyse.groupBy("symbol").count().orderBy(F.desc("count"))
# print("Number of rows for each symbol in descending order: ", symbol_count.show())

# print all the symbols with the number of rows
print("All symbols with the number of rows:")
symbol_count.collect()

All symbols with the number of rows:


[Row(symbol='ALXN', count=1762),
 Row(symbol='GIS', count=1762),
 Row(symbol='K', count=1762),
 Row(symbol='LEN', count=1762),
 Row(symbol='SPGI', count=1762),
 Row(symbol='AVY', count=1762),
 Row(symbol='MMM', count=1762),
 Row(symbol='PKI', count=1762),
 Row(symbol='PPG', count=1762),
 Row(symbol='RF', count=1762),
 Row(symbol='AXP', count=1762),
 Row(symbol='CI', count=1762),
 Row(symbol='IRM', count=1762),
 Row(symbol='WEC', count=1762),
 Row(symbol='PFG', count=1762),
 Row(symbol='PM', count=1762),
 Row(symbol='SNA', count=1762),
 Row(symbol='BLK', count=1762),
 Row(symbol='EA', count=1762),
 Row(symbol='ESRX', count=1762),
 Row(symbol='OXY', count=1762),
 Row(symbol='DUK', count=1762),
 Row(symbol='ULTA', count=1762),
 Row(symbol='HAS', count=1762),
 Row(symbol='MTD', count=1762),
 Row(symbol='TROW', count=1762),
 Row(symbol='FLIR', count=1762),
 Row(symbol='MAT', count=1762),
 Row(symbol='XL', count=1762),
 Row(symbol='EMN', count=1762),
 Row(symbol='KIM', count=1762),
 Row(symb

In [64]:
# Drop all the symbols with less than 1000 rows from the dataframe df1_nyse
symbols_to_drop = symbol_count.filter("count < 1000").select("symbol").collect()
print("Symbols to drop:", symbols_to_drop)

# Count the number of symbols and total rows will be dropped from the dataframe
print("Number of symbols to drop:", len(symbols_to_drop))
print("Total rows to be dropped:", symbol_count.filter("count < 1000").select("count").agg(sum("count")).collect()[0][0])

Symbols to drop: [Row(symbol='ZTS'), Row(symbol='COTY'), Row(symbol='MNK'), Row(symbol='NWS'), Row(symbol='NWSA'), Row(symbol='EVHC'), Row(symbol='ALLE'), Row(symbol='QRVO'), Row(symbol='SYF'), Row(symbol='CFG'), Row(symbol='NAVI'), Row(symbol='WRK'), Row(symbol='PYPL'), Row(symbol='KHC'), Row(symbol='HPE'), Row(symbol='CSRA')]
Number of symbols to drop: 16
Total rows to be dropped: 9945


In [65]:
print("Number of rows & columns in the dataframe before dropping the symbols with less than 1000 rows: ", (df1_nyse.count(), len(df1_nyse.columns)))
df1_nyse = df1_nyse.filter(~df1_nyse.symbol.isin([symbol.symbol for symbol in symbols_to_drop]))
print("Number of rows & columns in the dataframe after dropping the symbols with less than 1000 rows: ", (df1_nyse.count(), len(df1_nyse.columns)))

Number of rows & columns in the dataframe before dropping the symbols with less than 1000 rows:  (849125, 8)
Number of rows & columns in the dataframe after dropping the symbols with less than 1000 rows:  (839180, 8)


In [66]:
# - - -Load the data from the dataframe df1_nyse to MariaDB table stock_prices_nyse_transformed

# Write data to MariaDB using JDBC with properties dictionary
df1_nyse.write.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "stock_prices_nyse_transformed") \
    .option("user", db_properties['user']) \
    .option("password", db_properties['password']) \
    .option("driver", db_properties['driver']) \
    .mode("overwrite") \
    .save()

print("Data written to MariaDB table stock_prices_nyse_transformed")

Data written to MariaDB table stock_prices_nyse_transformed


In [67]:
# print("Stopping the Spark session", spark_session.stop())

### Upload the Transformed data to Azure SQL Database

In [68]:
# # - - - Spark session creation - with JDBC driver path configuration
# 
# # Application details
# appName = "App2 A1~: Upload data into Azure SQL Database"
# master = "local"
# jdbc_driver_path = "E:/Software_Lenovo/Development_Softwares/MariaDB_Spark/Microsoft JDBC Driver for SQL Server/sqljdbc_12.6.3.0_enu/sqljdbc_12.6/enu/jars/mssql-jdbc-12.6.3.jre8.jar"
# 
# # Create Spark session
# spark_session = SparkSession.builder \
#     .appName(appName) \
#     .master(master) \
#     .config("spark.driver.extraClassPath", jdbc_driver_path) \
#     .getOrCreate()
# 
# print("Details of the Spark session:")
# print("Spark:", spark_session, "\n", "Spark version:", spark_session.version, "\n", "SparkContext:", spark_session.sparkContext, "\n", "SparkContext version:", spark_session.sparkContext.version)

In [69]:
# Now I want to upload the transformed data to Azure SQL Database. 
# I have the following details for the Azure SQL Database:

# server name: alinizarserver1.database.windows.net

# connection string:
# jdbc:sqlserver://alinizarserver1.database.windows.net:1433;database=myfirstdatabase;user=azureuser@alinizarserver1;password={your_password_here};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;

# UserName : azureuser
# Password : Ilove85workWonder69

# Give me the steps to upload the data to Azure SQL Database based on the information that I have.
# The steps are as follows:
# 1. Create a new JDBC connection to the Azure SQL Database.
# 2. Write the transformed data to the Azure SQL Database table.
# 3. Verify the data in the Azure SQL Database table.

# - - - Write the transformed data to Azure SQL Database table stock_prices_nyse_transformed

# JDBC connection details (Parameters)
server = "alinizarserver1.database.windows.net"
port = 1433
database = "myfirstdatabase"

db_azure_properties = {
    "user": "azureuser@alinizarserver1",
    "password": "Ilove85workWonder69",
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}


# jdbc_url_azure = f"jdbc:sqlserver://{server}.database.windows.net:{port};database={database};user={db_azure_properties['user']};password={db_azure_properties['password']};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

jdbc_url_azure = "jdbc:sqlserver://alinizarserver1.database.windows.net:1433;database=myfirstdatabase;user=azureuser@alinizarserver1;password=Ilove85workWonder69;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"


In [70]:
# Write data to Azure SQL Database using JDBC with properties dictionary
df1_nyse.write.format("jdbc") \
    .option("url", jdbc_url_azure) \
    .option("dbtable", "stock_prices_nyse_transformed") \
    .option("user", db_azure_properties['user']) \
    .option("password", db_azure_properties['password']) \
    .option("driver", db_azure_properties['driver']) \
    .mode("overwrite") \
    .save()

print("Data written into the table stock_prices_nyse_transformed of Azure SQL Database")


Data written into the table stock_prices_nyse_transformed of Azure SQL Database


In [71]:
# Verify the data in the Azure SQL Database table
df2_azure = spark_session.read.format("jdbc") \
    .option("url", jdbc_url_azure) \
    .option("dbtable", "stock_prices_nyse_transformed") \
    .option("user", db_azure_properties['user']) \
    .option("password", db_azure_properties['password']) \
    .option("driver", db_azure_properties['driver']) \
    .load()

print("Data read from Azure SQL Database table stock_prices_nyse_transformed")

print("Schema of the dataframe read from Azure SQL Database table stock_prices_nyse_transformed:\n", df2_azure.printSchema())


Data read from Azure SQL Database table stock_prices_nyse_transformed
root
 |-- date: date (nullable = true)
 |-- symbol: string (nullable = true)
 |-- open: decimal(15,10) (nullable = true)
 |-- close: decimal(15,10) (nullable = true)
 |-- low: decimal(15,10) (nullable = true)
 |-- high: decimal(15,10) (nullable = true)
 |-- volume: integer (nullable = true)
 |-- country: string (nullable = true)

Schema of the dataframe read from Azure SQL Database table stock_prices_nyse_transformed:
 None
+----------+------+-------------+-------------+-------------+-------------+-------+-------+
|      date|symbol|         open|        close|          low|         high| volume|country|
+----------+------+-------------+-------------+-------------+-------------+-------+-------+
|2010-01-04|     A|31.3899990000|31.3000010000|31.1300000000|31.6300010000|3815500|    USA|
|2010-01-05|     A|31.2099990000|30.9600010000|30.7600000000|31.2200010000|4186000|    USA|
|2010-01-06|     A|30.8500010000|30.850001

In [73]:
print("First 15 rows of the dataframe read from Azure SQL Database table stock_prices_nyse_transformed:\n", df2_azure.show(15))

+----------+------+-------------+-------------+-------------+-------------+-------+-------+
|      date|symbol|         open|        close|          low|         high| volume|country|
+----------+------+-------------+-------------+-------------+-------------+-------+-------+
|2010-01-04|     A|31.3899990000|31.3000010000|31.1300000000|31.6300010000|3815500|    USA|
|2010-01-05|     A|31.2099990000|30.9600010000|30.7600000000|31.2200010000|4186000|    USA|
|2010-01-06|     A|30.8500010000|30.8500010000|30.7600000000|31.0000010000|3243700|    USA|
|2010-01-07|     A|30.7800010000|30.8099990000|30.5000000000|30.8200010000|3095100|    USA|
|2010-01-08|     A|30.6400000000|30.8000000000|30.3999990000|30.8500010000|3733900|    USA|
|2010-01-11|     A|30.8799990000|30.8200010000|30.6700010000|31.0500000000|4781500|    USA|
|2010-01-12|     A|30.5600010000|30.4500010000|30.2199990000|30.6500000000|2871000|    USA|
|2010-01-13|     A|30.4699990000|30.6899990000|30.0500010000|30.7800010000|34189

In [74]:
print("Total number of rows in the dataframe read from Azure SQL Database table stock_prices_nyse_transformed: ", df2_azure.count())

Total number of rows in the dataframe read from Azure SQL Database table stock_prices_nyse_transformed:  839180


In [75]:
print("Stopping the Spark session", spark_session.stop())

Stopping the Spark session None
