In [18]:
!pip install pandas xlsxwriter

[0m

In [19]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("records").getOrCreate()


In [20]:
spark.sql("SHOW CATALOGS").show()


+-------------+
|      catalog|
+-------------+
|         demo|
|spark_catalog|
+-------------+



In [9]:
# spark.sql("DROP TABLE IF EXISTS nyc.taxis_10M_50COLUMNS")


In [21]:
 spark.sql("DROP TABLE IF EXISTS demo.nyc.taxis_10M_50")

DataFrame[]

In [22]:
import os
iceberg_table_dir = "../warehouse/nyc/taxis_10M_50"
metadata_dir = f"{iceberg_table_dir}/metadata"
data_dir = f"{iceberg_table_dir}/data"
input_data_dir = f"../input_data"
analysis_info = []
records_before_op = 0

def append_to_file(file_path, msg):
    open_mode = "a"
    if not os.path.exists(file_path):
        open_mode = "w"

    # Open the CSV file in write mode
    with open(file_path, open_mode) as file:
        writer = csv.writer(file)
        
        if open_mode=="w":
            #writing header of the columns
            writer.writerows([list(msg.keys())])    

        row_values = [list(msg.values())]
        # Write the data to the CSV file
        writer.writerows(row_values)

def get_size():
    # List the metadata files
    manifest_pattern = re.compile(r".*-m\d+\.avro$")
    metadata_files = os.listdir(metadata_dir)
    
    # Initialize variables to store the sizes of different types of metadata files
    snap_avro_size = 0
    metadata_json_size = 0
    m_avro_size = 0

    data_dir_size = 0
    # get data dir size
    data_dir_files = os.listdir(data_dir)
    # print(data_dir_files)
    for filename in data_dir_files:
        file_path = os.path.join(data_dir, filename)
        data_dir_size += os.path.getsize(file_path) / 1024  # Convert size to KB
    
    # Iterate through the metadata files and calculate their sizes
    for file in metadata_files:
        file_path = os.path.join(metadata_dir, file)
        file_size_kb = os.path.getsize(file_path) / 1024  # Convert size to KB
        
        if file.startswith("snap-") and file.endswith(".avro"):
            snap_avro_size += file_size_kb
        elif file.endswith(".metadata.json"):
            metadata_json_size += file_size_kb
        elif manifest_pattern.match(file):
            m_avro_size += file_size_kb
    
    # Print the time taken and the sizes of the metadata files
    # print(f"Time taken to read Parquet files: {time_taken:.2f} seconds")
    # print(f"Size of snap-*.avro files: {snap_avro_size:.2f} KB")
    # print(f"Size of *.metadata.json files: {metadata_json_size:.2f} KB")
    # print(f"Size of *m{0-9}{1,}.avro files: {m_avro_size:.2f} KB")

    return {"data_dir_size": data_dir_size,"metadata_size": metadata_json_size,"snapshot_size": snap_avro_size,"manifest_size": m_avro_size}


In [23]:
from pyspark.sql.types import (
    DoubleType, FloatType, LongType, StructType, StructField, 
    StringType, IntegerType, DateType
)

# Define the schema with 50 columns based on the required data types
schema = StructType([
    # StructField("vendor_id", LongType(), True),  # INT
    # StructField("trip_id", LongType(), True),  # INT
    # StructField("trip_distance", FloatType(), True),  # FLOAT
    # StructField("fare_amount", DoubleType(), True),  # DOUBLE
    # StructField("store_and_fwd_flag", StringType(), True)  # STRING
# ] + [
    # Assigning VARCHAR, INT, STRING, and DATE data types in a cyclic pattern
    StructField(f"extra_col_{i}", StringType(), True) if i % 4 == 0 else  # VARCHAR
    StructField(f"extra_col_{i}", IntegerType(), True) if i % 4 == 1 else  # INT
    StructField(f"extra_col_{i}", StringType(), True) if i % 4 == 2 else  # STRING
    StructField(f"extra_col_{i}", DateType(), True)  # DATE
    for i in range(50)
])

# Create an empty DataFrame with the schema
df = spark.createDataFrame([], schema)

# Create the Iceberg table
df.writeTo("demo.nyc.taxis_10M_50").create()



In [24]:
# df = spark.table("demo.nyc.taxis_10M_50COLUMNS")
# df.show()

In [25]:
import time, csv
from pyspark.sql.functions import col, when
from pyspark.sql import functions as F
import os

input_data_dir = "../input_data"
output_dir = "../output"
analysis_info = []
records_before_op = 0
total_insertion_time = 0
    
file_type = input("Enter input file type csv or parquet? : ")
file_type = file_type.lower().strip()
input_data_dir = os.path.join(input_data_dir, file_type)
input_files = os.listdir(input_data_dir)

analysis_file = os.path.join(output_dir, f"analysis_info_{file_type}.csv")
if os.path.exists(analysis_file):
    os.remove(analysis_file)

df = spark.table("demo.nyc.taxis_10M_50")
records_before_op = df.count()

digits = len(str(records_before_op))

for file in input_files:
    print(f"Started with file={file}")
    file_path = os.path.join(input_data_dir, file)

    st = time.time()
    if file_type == "parquet":
        df = spark.read.parquet(file_path)
    else:
        df = spark.read.csv(file_path, header=True)
        df = df.select(
            F.col("vendor_id").cast("long"),
            F.col("trip_id").cast("long"),
            F.col("trip_distance").cast("float"),
            F.col("fare_amount").cast("double"),
            F.col("store_and_fwd_flag").cast("string"),
            *[F.col(f"extra_col_{i}").cast("string" if i % 4 == 0 or i % 4 == 2 else "int" if i % 4 == 1 else "date") for i in range(45)]
        )
    
    rows = df.count()
    
    df.writeTo("demo.nyc.taxis_10M_50").append()
    end = time.time() - st
    total_insertion_time += end

    details = get_size()
    details["time_taken"] = f"{end} sec"
    details["Operation"] = f"Inserted {rows} records"
    details["records_after_op"] = records_before_op + rows

    records_before_op += rows
    del st, end, df

    append_to_file(analysis_file, details)
    analysis_info.append(details)
    
    print(f"inserted {rows} records..")
    
    current_digit = len(str(records_before_op))
    
    if current_digit <= digits:
        continue
    else:
        digits = current_digit
        df = spark.table("demo.nyc.taxis_10M_50")
    
    # for vendor_id in df.select("vendor_id").distinct().collect()[:10]:
    #     vendor_id = vendor_id[0]
    #     df = spark.table("demo.nyc.taxis_100_50COLUMNS")

    #     st = time.time()
    #     updated_df = df.withColumn("fare_amount", 
    #                               when(col("vendor_id") == vendor_id, col("fare_amount") + 40)
    #                               .otherwise(col("fare_amount")))
        
    #     updated_df.writeTo("demo.nyc.taxis_100_50COLUMNS").overwritePartitions()
        
    #     end = time.time() - st
    #     rows = updated_df.filter(updated_df["vendor_id"] == vendor_id).count()
        
    #     details = get_size()
    #     details["time_taken"] = f"{end} sec"
    #     details["Operation"] = f"Updated {rows} records"
    #     details["records_after_op"] = records_before_op

    #     append_to_file(analysis_file, details)
        
    #     del df, st, end
    #     analysis_info.append(details)
print(f"\nTotal insertion time: {total_insertion_time:.2f} sec")

Enter input file type csv or parquet? :  parquet


Started with file=records_1000000_part_10_1740401457.66906.parquet


                                                                                

inserted 1000000 records..
Started with file=records_1000000_part_1_1740398687.6853974.parquet


                                                                                

inserted 1000000 records..
Started with file=records_1000000_part_2_1740398997.7710938.parquet


                                                                                

inserted 1000000 records..
Started with file=records_1000000_part_3_1740399303.6597402.parquet


                                                                                

inserted 1000000 records..
Started with file=records_1000000_part_4_1740399611.4401598.parquet


                                                                                

inserted 1000000 records..
Started with file=records_1000000_part_5_1740399918.8825066.parquet


                                                                                

inserted 1000000 records..
Started with file=records_1000000_part_6_1740400229.5675209.parquet


                                                                                

inserted 1000000 records..
Started with file=records_1000000_part_7_1740400532.7327414.parquet


                                                                                

inserted 1000000 records..
Started with file=records_1000000_part_8_1740400841.6608176.parquet


                                                                                

inserted 1000000 records..
Started with file=records_1000000_part_9_1740401151.339735.parquet


                                                                                

inserted 1000000 records..

Total insertion time: 278.41 sec
