In [1]:
#/home/labuser/Documents/Level3/Day3/DataDemo/customers.parquet
#/home/labuser/Documents/Level3/Day3/DataDemo/bank_loans.parquet
#/home/labuser/Documents/Level3/Day3/DataDemo/branches.parquets
#/home/labuser/Documents/Level3/Day3/DataDemo/transactions.parquet

In [2]:
import pyarrow.parquet as pq
import pyarrow as pa
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import time

# Initialize Spark session
spark = SparkSession.builder.appName("FixTimestampsAndJoin").getOrCreate()
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")  # Disable auto broadcast joins

# Define file paths
input_parquet = "/home/labuser/Documents/Level3/Day3/DataDemo/transactions.parquet"
output_parquet = "/home/labuser/Documents/Level3/Day3/DataDemo/transactions_fixed.parquet"

# Read the original Parquet file
table = pq.read_table(input_parquet)

# Convert TIMESTAMP(NANOS) to TIMESTAMP(MILLIS)
new_columns = []
for col_name, col_type in zip(table.column_names, table.schema.types):
    if col_name == "timestamp" and pa.types.is_timestamp(col_type):
        print(f"⚠️ Converting {col_name} from {col_type} → TIMESTAMP(MILLIS)")
        
        # Convert each chunk individually
        timestamp_chunks = []
        for chunk in table[col_name].chunks:
            int_chunk = chunk.cast(pa.int64())  # Convert timestamp to int64 (nanoseconds)
            millis_chunk = pa.array(int_chunk.to_numpy() // 1_000_000, type=pa.int64())  # Convert ns → ms
            timestamp_chunks.append(millis_chunk.cast(pa.timestamp("ms")))  # Cast back to TIMESTAMP(MILLIS)
        
        # Reconstruct column as a single ChunkedArray
        new_columns.append(pa.chunked_array(timestamp_chunks))
    else:
        new_columns.append(table[col_name])

# Create new table with updated schema
fixed_table = pa.Table.from_arrays(new_columns, names=table.column_names)

# Write the corrected Parquet file
pq.write_table(fixed_table, output_parquet)

print("✅ TIMESTAMP(NANOS) successfully converted to TIMESTAMP(MILLIS)")
print("Saved as:", output_parquet)

# ==============================
# Load into PySpark and Perform Joins
# ==============================

# Read converted transactions
transactions = spark.read.parquet(output_parquet)

# Read customers data and rename "CustomerID" to "customer_id" for join compatibility
customers = spark.read.parquet("/home/labuser/Documents/Level3/Day3/DataDemo/customers.parquet") \
    .withColumnRenamed("CustomerID", "customer_id")

# Start Timer
start_time = time.time()

# Perform Join (handling skewed data)
transactions.join(customers, "customer_id", "inner").groupBy("customer_id").count().show()

# End Timerend
end_time = time.time()

# Calculate Execution time 
execution_time = end_time - start_time
print(f"Execution Time : {execution_time:.2f} seconds")

⚠️ Converting timestamp from timestamp[ns] → TIMESTAMP(MILLIS)
✅ TIMESTAMP(NANOS) successfully converted to TIMESTAMP(MILLIS)
Saved as: /home/labuser/Documents/Level3/Day3/DataDemo/transactions_fixed.parquet
+-----------+-----+
|customer_id|count|
+-----------+-----+
|      10222|    3|
|      10371|    4|
|      11193|    2|
|      11378|    4|
|      11553|    4|
|      11568|    2|
|      11664|    5|
|      12285|    3|
|      12829|    8|
|      12830|    1|
|      13304|    6|
|      13670|    2|
|      13687|    3|
|      13694|    3|
|      14144|    4|
|      14846|    4|
|      15901|    4|
|      15953|    4|
|      16568|    7|
|      16749|    6|
+-----------+-----+
only showing top 20 rows

Execution Time : 5.82 seconds


In [3]:
# After Optimized Skew Joins & Bloom Filter Pushdown

spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.optimizer.runtime.bloomfileter.enabled", "true")

# Start Timer
start_time = time.time()

# Perform Join (handling skewed data)
transactions.join(customers, "customer_id", "inner").groupBy("customer_id").count().show()

# End Timerend
end_time = time.time()

# Calculate Execution time 
execution_time = end_time - start_time
print(f"Execution Time : {execution_time:.2f} seconds")

+-----------+-----+
|customer_id|count|
+-----------+-----+
|      10222|    3|
|      10371|    4|
|      11193|    2|
|      11378|    4|
|      11553|    4|
|      11568|    2|
|      11664|    5|
|      12285|    3|
|      12829|    8|
|      12830|    1|
|      13304|    6|
|      13670|    2|
|      13687|    3|
|      13694|    3|
|      14144|    4|
|      14846|    4|
|      15901|    4|
|      15953|    4|
|      16568|    7|
|      16749|    6|
+-----------+-----+
only showing top 20 rows

Execution Time : 1.03 seconds


In [4]:
spark