In [1]:
import os

# Set SPARK_HOME and JAVA_HOME environment variables
os.environ['SPARK_HOME'] = '/usr/local/Cellar/apache-spark/3.5.1/libexec'
os.environ['JAVA_HOME'] = '/usr/local/opt/openjdk/libexec/openjdk.jdk/Contents/Home'

## Common Spark Error in Real-Time Project

### Task:

Task: Change the data type of the column processed_datetime from DATE to TIMESTAMP

Error: 
error reading the parquet file at s3://big-data-project/test-schema/test-table/load_ts=20240731160000
column "processed_datetime" expected TIMESTAMP but found INT4

### Error Context

The error arises when the data type in the Parquet files does not match the expected schema in the table. In this case, the `processed_datetime` column is expected to be a `TIMESTAMP`, but the data in the partitions is still in `DATE` format. This mismatch often occurs because of changes in schema definitions in partitioned tables.


### Steps to Rectify the Error

1. **Identify the Mismatched Partitions:**
   - Check which partitions are using the old data type. In the blog post context, this would mean identifying which partitions still have the `DATE` type instead of the `TIMESTAMP`.

2. **Update the Schema of Existing Partitions:**
   - Alter the data type of columns in the partitions. This could involve manually altering the schema or rewriting the data.

3. **Drop the Old Partitions (if applicable):**
   - If partitions are not needed or can be reprocessed, drop them. This is often easier in sandbox or development environments.

4. **Reprocess the Data (if needed):**
   - Delete the data in S3 and reprocess it from the source. This ensures that all data follows the new schema and eliminates schema mismatch issues.


In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, to_timestamp

# Initialize Spark session
spark = SparkSession.builder.appName("SparkDataTypeChange").getOrCreate()

# Sample data with DATE type column
data = [("2023-07-31", "data1"), ("2023-07-30", "data2"), ("2023-07-29", "data3")]
schema = ["processed_datetime", "data"]

# Create DataFrame
df = spark.createDataFrame(data, schema)
df = df.withColumn("processed_datetime", to_date(col("processed_datetime"), "yyyy-MM-dd"))

# Save DataFrame as Parquet with partitioning
df.write.mode("overwrite").partitionBy("processed_datetime").parquet("Pyspark_learning/Issues-learning /Typecast_partitioned_table/data/processed_date")

# Read back the Parquet files to simulate the original scenario
df_loaded = spark.read.parquet("Pyspark_learning/Issues-learning /Typecast_partitioned_table/data/processed_date")
df_loaded.show()


+-----+------------------+
| data|processed_datetime|
+-----+------------------+
|data2|        2023-07-30|
|data1|        2023-07-31|
|data3|        2023-07-29|
+-----+------------------+



Now, let's handle the error by modifying the partitions and updating the data type.

Option 1: Alter the data type of columns in the partitions

In [11]:
# Altering the schema to TIMESTAMP (simulated)
# Recreate DataFrame with TIMESTAMP type
df_timestamp = df.withColumn("processed_datetime", to_timestamp(col("processed_datetime"), "yyyy-MM-dd"))

# Overwrite the existing partition with the new TIMESTAMP type
df_timestamp.write.mode("overwrite").partitionBy("processed_datetime").parquet("Pyspark_learning/Issues-learning /Typecast_partitioned_table/data/processed_date")

# Read the updated Parquet files to verify
df_loaded_updated = spark.read.parquet("Pyspark_learning/Issues-learning /Typecast_partitioned_table/data/processed_date")
df_loaded_updated.show()

+-----+-------------------+
| data| processed_datetime|
+-----+-------------------+
|data3|2023-07-29 00:00:00|
|data1|2023-07-31 00:00:00|
|data2|2023-07-30 00:00:00|
+-----+-------------------+



### Dropping Partitions

To drop partitions, you can use a SQL command to alter the table and drop specific partitions. This approach assumes you're working with a managed table in Spark SQL. If using DataFrames directly, you may need to delete the relevant files.

In [None]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("DropPartitionsExample").getOrCreate()

# Assuming you have a managed table
table_name = "your_table_name"
partition_column = "processed_datetime"
partition_value_to_drop = "2023-07-31"  # Example partition value

# Drop specific partition
spark.sql(f"ALTER TABLE {table_name} DROP IF EXISTS PARTITION ({partition_column} = '{partition_value_to_drop}')")

Deleting and Reprocessing Data


To delete data from S3 and reprocess it, you can use Spark to delete the old data and then write new data.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, to_timestamp

# Initialize Spark session
spark = SparkSession.builder.appName("DeleteAndReprocessData").getOrCreate()

# Path to the old data
old_data_path = "s3://your-bucket/path/to/old/data"

# Delete old data
import shutil
import os

# Local file system simulation (adjust for S3 using boto3 or other methods)
shutil.rmtree(old_data_path, ignore_errors=True)

# Sample data with new TIMESTAMP type column
data = [("2023-07-31 00:00:00", "data1"), ("2023-07-30 00:00:00", "data2"), ("2023-07-29 00:00:00", "data3")]
schema = ["processed_datetime", "data"]

# Create DataFrame with TIMESTAMP type
df = spark.createDataFrame(data, schema)
df = df.withColumn("processed_datetime", to_timestamp(col("processed_datetime"), "yyyy-MM-dd HH:mm:ss"))

# Save DataFrame as Parquet with partitioning
df.write.mode("overwrite").partitionBy("processed_datetime").parquet("s3://your-bucket/path/to/new/data")


Schema Evolution
Schema evolution is a feature of some data storage formats and platforms, like Delta Lake or Apache Hudi.
For standard Parquet files and Spark, you might not have direct schema evolution support, but you can manually handle schema updates.

Using Delta Lake (an example for schema evolution):

In [None]:
from pyspark.sql import SparkSession
from delta.tables import DeltaTable

# Initialize Spark session
spark = SparkSession.builder \
    .appName("SchemaEvolutionExample") \
    .getOrCreate()

# Path to Delta table
delta_table_path = "s3://your-bucket/path/to/delta_table"

# Convert to Delta format (if not already)
df.write.format("delta").mode("overwrite").save(delta_table_path)

# Read Delta table
delta_table = DeltaTable.forPath(spark, delta_table_path)

# Alter the schema (add new column example)
delta_table.alterTableAddColumn("new_column STRING")

# Upsert data with new schema
from pyspark.sql.functions import lit

new_data = [( "2023-08-01 00:00:00", "data4", "new_value")]
new_df = spark.createDataFrame(new_data, ["processed_datetime", "data", "new_column"])

new_df.write.format("delta").mode("append").save(delta_table_path)
