In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, hash, count, lit
import time

In [2]:
# Define the connection properties to SQL Server
jdbcHostname = "COGNINE-L143"
jdbcPort = 1433
jdbcDatabase = "Data"  # Replace with your database name
jdbcUrl = f"jdbc:sqlserver://{jdbcHostname}:{jdbcPort};databaseName={jdbcDatabase};trustServerCertificate=true"
connectionProperties = {
    "user": "Read",  # Replace with your username
    "password": "Welcome2cognine",  # Replace with your password
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

In [3]:
# Path to the JDBC driver JAR file
jdbc_driver_path = "C:\\sqljdbc_12.8\\enu\\jars\\mssql-jdbc-12.8.0.jre11.jar"

In [4]:
# Initialize the Spark session with the JDBC driver
spark = SparkSession.builder \
    .appName("StructuredStreamingWithSalting") \
    .config("spark.driver.extraClassPath", jdbc_driver_path) \
    .config("spark.executor.extraClassPath", jdbc_driver_path) \
    .getOrCreate()

In [5]:
def read_from_sql(max_id):
    query = f"(SELECT * FROM [220k_awards_by_directors] WHERE ID > {max_id}) AS data"  # Replace with your table and query
    df = spark.read \
        .format("jdbc") \
        .option("url", jdbcUrl) \
        .option("dbtable", query) \
        .option("user", connectionProperties["user"]) \
        .option("password", connectionProperties["password"]) \
        .option("driver", connectionProperties["driver"]) \
        .load()
    return df

In [6]:
# Initialize max_id
max_id = 0
save_file_number = 0

In [7]:
# Define the base output path for the processed data
base_output_path = r"C:\Users\Manideep S\OneDrive - COGNINE\ML\Assessments\PySpark\DataSkew-Salting"  # Replace with the desired base output path

In [9]:
# Start streaming loop
while True:
    # Read the latest data from SQL Server
    new_data = read_from_sql(max_id)
    
    if new_data.count() > 0:
        # Add a salt column to the DataFrame using a hash of the director_name column
        salted_df = new_data.withColumn("salt", (hash(col("director_name")) % 10).cast("string"))
        
        # Perform processing tasks on salted_df
        # Example processing task 1: Filter rows where outcome is 'Won'
        filtered_df = salted_df.filter(col("outcome") == "Won")
        
        # Example processing task 2: Group by director_name and count the number of awards
        aggregated_df = filtered_df.groupBy("director_name").agg(count("*").alias("award_count"))
        
        # Example task: Add a new column that converts 'award_count' to string
        processed_df = aggregated_df.withColumn("award_count_str", col("award_count").cast("string"))
        
        # Define the output path for the current file
        output_path = f"{base_output_path}_{save_file_number}.parquet"
        
        # Write the processed DataFrame to a Parquet file
        processed_df.write.mode("overwrite").parquet(output_path)
        
        # Update max_id to the maximum ID from the newly read data
        max_id = new_data.agg({"ID": "max"}).collect()[0][0]
        
        # Increment the file number for the next save
        save_file_number += 1
    
    # Sleep for a specific interval before the next read
    time.sleep(10)


KeyboardInterrupt: 

In [19]:
import os
parquet_dir = r'C:\Users\Manideep S\OneDrive - COGNINE\ML\Assessments\PySpark'

In [20]:
# Read all Parquet files into a list of DataFrames
parquet_files = [f"{parquet_dir}/{file}" for file in os.listdir(parquet_dir) if file.endswith(".parquet")]

In [21]:
dataframes = [spark.read.parquet(file) for file in parquet_files]

In [23]:
c=0
for df in dataframes:
    print(c)
    df.printSchema()
    print("number of partitions: ",df.rdd.getNumPartitions())
    print("___________________________")
    c=c+1

0
root
 |-- director_name: string (nullable = true)
 |-- award_count: long (nullable = true)
 |-- award_count_str: string (nullable = true)

number of partitions:  1
___________________________
1
root
 |-- director_name: string (nullable = true)
 |-- award_count: long (nullable = true)
 |-- award_count_str: string (nullable = true)

number of partitions:  1
___________________________
2
root
 |-- director_name: string (nullable = true)
 |-- award_count: long (nullable = true)
 |-- award_count_str: string (nullable = true)

number of partitions:  1
___________________________
3
root
 |-- director_name: string (nullable = true)
 |-- award_count: long (nullable = true)
 |-- award_count_str: string (nullable = true)

number of partitions:  1
___________________________
4
root
 |-- director_name: string (nullable = true)
 |-- award_count: long (nullable = true)
 |-- award_count_str: string (nullable = true)

number of partitions:  1
___________________________
5
root
 |-- director_name: str

In [24]:
# Combine all DataFrames into a single DataFrame
combined_df = dataframes[0]
for df in dataframes[1:]:
    combined_df = combined_df.union(df)

In [25]:
# Show the schema and some rows to check the data
combined_df.printSchema()
combined_df.show()

root
 |-- director_name: string (nullable = true)
 |-- award_count: long (nullable = true)
 |-- award_count_str: string (nullable = true)

+------------------+-----------+---------------+
|     director_name|award_count|award_count_str|
+------------------+-----------+---------------+
|    Allison Anders|          7|              7|
|      Jim Jarmusch|         30|             30|
|        Rob Bowman|          1|              1|
|       John Milius|          3|              3|
|     Marcel Ophüls|         11|             11|
|     Paul Borghese|          5|              5|
|      Wayne Kramer|          1|              1|
|Pierre-Alain Meier|          1|              1|
|Christopher Monger|          7|              7|
|    Josef von Báky|          1|              1|
|       Tommy Chong|          1|              1|
|        Bobby Roth|          3|              3|
|  Terry Cunningham|          1|              1|
|     Robert Parigi|          3|              3|
|     Anne Goursaud|        

In [26]:
# Check for partitions
if combined_df.rdd.getNumPartitions() > 1:
    print(f"The DataFrame has {combined_df.rdd.getNumPartitions()} partitions.")
else:
    print("The DataFrame has only one partition.")

The DataFrame has 7 partitions.
