In [1]:
!pip install s3fs

Collecting s3fs
  Downloading s3fs-2023.12.2-py3-none-any.whl.metadata (1.6 kB)
Collecting aiobotocore<3.0.0,>=2.5.4 (from s3fs)
  Downloading aiobotocore-2.9.0-py3-none-any.whl.metadata (20 kB)
Collecting fsspec==2023.12.2 (from s3fs)
  Downloading fsspec-2023.12.2-py3-none-any.whl.metadata (6.8 kB)
Collecting aiohttp!=4.0.0a0,!=4.0.0a1 (from s3fs)
  Downloading aiohttp-3.9.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.4 kB)
Collecting botocore<1.33.14,>=1.33.2 (from aiobotocore<3.0.0,>=2.5.4->s3fs)
  Downloading botocore-1.33.13-py3-none-any.whl.metadata (6.1 kB)
Collecting wrapt<2.0.0,>=1.10.10 (from aiobotocore<3.0.0,>=2.5.4->s3fs)
  Downloading wrapt-1.16.0-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.6 kB)
Collecting aioitertools<1.0.0,>=0.5.1 (from aiobotocore<3.0.0,>=2.5.4->s3fs)
  Downloading aioitertools-0.11.0-py3-none-any.whl (23 kB)
Collecting multidict<7.0,>=4.5 (from aiohttp!=4.0.

In [1]:
import os
import s3fs
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark.sql.functions as F
from pyspark.sql import Row
import pyspark.sql.types as T
import datetime
import time

In [2]:
# Define environment variables
os.environ["MINIO_KEY"] = "minio"
os.environ["MINIO_SECRET"] = "minio123"
os.environ["MINIO_ENDPOINT"] = "http://minio1:9000"

In [3]:
# Create Spark session
spark = SparkSession.builder \
    .appName("big_data_file_formats") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.1026,org.apache.spark:spark-avro_2.12:3.5.0,io.delta:delta-spark_2.12:3.0.0") \
    .config("spark.hadoop.fs.s3a.endpoint", os.environ["MINIO_ENDPOINT"]) \
    .config("spark.hadoop.fs.s3a.access.key", os.environ["MINIO_KEY"]) \
    .config("spark.hadoop.fs.s3a.secret.key", os.environ["MINIO_SECRET"]) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .enableHiveSupport() \
    .getOrCreate()

In [5]:
# Print Scala version
scala_version = spark.sparkContext._jvm.scala.util.Properties.versionString()
print(scala_version)

version 2.12.18


In [6]:
# Generate sample data
num_rows = 10000000
df = spark.range(0, num_rows)

# Add columns
for i in range(1, 10):  # Since we already have one column
    if i % 2 == 0:
        # Integer column
        df = df.withColumn(f"int_col_{i}", (F.randn() * 100).cast(T.IntegerType()))
    else:
        # String column
        df = df.withColumn(f"str_col_{i}", (F.rand() * num_rows).cast(T.IntegerType()).cast("string"))

df.count()

10000000

In [7]:
# Show rows from sample data
df.show(10,truncate = False)

+---+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|id |str_col_1|int_col_2|str_col_3|int_col_4|str_col_5|int_col_6|str_col_7|int_col_8|str_col_9|
+---+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|0  |7764018  |128      |1632029  |-15      |5858297  |114      |1025493  |-88      |7376083  |
|1  |2618524  |118      |912383   |235      |6684042  |-115     |9882176  |170      |3220749  |
|2  |6351000  |75       |3515510  |26       |2605886  |89       |3217428  |87       |4045983  |
|3  |4346827  |-70      |2627979  |-23      |9543505  |69       |2421674  |-141     |7049734  |
|4  |9458796  |-106     |6374672  |-142     |5550170  |25       |4842269  |-97      |5265771  |
|5  |9203992  |23       |4818602  |42       |530044   |28       |5560538  |-75      |2307858  |
|6  |8900698  |-130     |2735238  |-135     |1308929  |22       |3279458  |-22      |3412851  |
|7  |6876605  |-35      |6690534  |-41  

In [8]:
# Print schema of sample data
df.printSchema()

root
 |-- id: long (nullable = false)
 |-- str_col_1: string (nullable = true)
 |-- int_col_2: integer (nullable = true)
 |-- str_col_3: string (nullable = true)
 |-- int_col_4: integer (nullable = true)
 |-- str_col_5: string (nullable = true)
 |-- int_col_6: integer (nullable = true)
 |-- str_col_7: string (nullable = true)
 |-- int_col_8: integer (nullable = true)
 |-- str_col_9: string (nullable = true)



In [9]:
# Write 4 CSVs for comparing performance for every file type
df.write.csv("s3a://mybucket/ten_million_parquet.csv")
df.write.csv("s3a://mybucket/ten_million_avro.csv")
df.write.csv("s3a://mybucket/ten_million_orc.csv")
df.write.csv("s3a://mybucket/ten_million_delta.csv")

In [11]:
# Read all four CSVs to create dataframes
schema = T.StructType([
    T.StructField("id", T.LongType(), nullable=False),
    T.StructField("str_col_1", T.StringType(), nullable=True),
    T.StructField("int_col_2", T.IntegerType(), nullable=True),
    T.StructField("str_col_3", T.StringType(), nullable=True),
    T.StructField("int_col_4", T.IntegerType(), nullable=True),
    T.StructField("str_col_5", T.StringType(), nullable=True),
    T.StructField("int_col_6", T.IntegerType(), nullable=True),
    T.StructField("str_col_7", T.StringType(), nullable=True),
    T.StructField("int_col_8", T.IntegerType(), nullable=True),
    T.StructField("str_col_9", T.StringType(), nullable=True)
])

df_csv_parquet = spark.read.format("csv").option("header",True).schema(schema).load("s3a://mybucket/ten_million_parquet.csv")
df_csv_avro = spark.read.format("csv").option("header",True).schema(schema).load("s3a://mybucket/ten_million_avro.csv")
df_csv_orc = spark.read.format("csv").option("header",True).schema(schema).load("s3a://mybucket/ten_million_orc.csv")
df_csv_delta = spark.read.format("csv").option("header",True).schema(schema).load("s3a://mybucket/ten_million_delta.csv")

In [12]:
# Write data as Parquet
start_time = time.time()
df_csv_parquet.write.parquet("s3a://mybucket/ten_million_parquet2.parquet")
end_time = time.time()
print(f"Time taken to write as Parquet: {end_time - start_time} seconds")

Time taken to write as Parquet: 15.137483358383179 seconds


In [13]:
# Write data as Avro
start_time = time.time()
df_csv_avro.write.format("avro").save("s3a://mybucket/ten_million_avro2.avro")
end_time = time.time()
print(f"Time taken to write as Avro: {end_time - start_time} seconds")

Time taken to write as Avro: 12.808173656463623 seconds


In [14]:
# Write data as ORC
start_time = time.time()
df_csv_orc.write.orc("s3a://mybucket/ten_million_orc2.orc")
end_time = time.time()
print(f"Time taken to write as ORC: {end_time - start_time} seconds")

Time taken to write as ORC: 12.936703443527222 seconds


In [15]:
# Write data as Delta
start_time = time.time()
df_csv_delta.write.format("delta").save("s3a://mybucket/ten_million_delta2.delta")
end_time = time.time()
print(f"Time taken to write as Delta Lake: {end_time - start_time} seconds")

Time taken to write as Delta Lake: 17.779020309448242 seconds


In [16]:
# Perfom aggregation query using Parquet data
start_time = time.time()
df_parquet = spark.read.parquet("s3a://mybucket/ten_million_parquet2.parquet")
df_parquet \
.select("str_col_5","str_col_7","int_col_2") \
.groupBy("str_col_5","str_col_7") \
.count() \
.orderBy("count") \
.limit(1) \
.show(truncate = False)
end_time = time.time()
print(f"Time taken for query: {end_time - start_time} seconds")

+---------+---------+-----+
|str_col_5|str_col_7|count|
+---------+---------+-----+
|1        |6429997  |1    |
+---------+---------+-----+

Time taken for query: 12.326371192932129 seconds


In [20]:
# Perform aggregation using Avro data
df_avro = spark.read.format("avro").load("s3a://mybucket/ten_million_avro2.avro")
start_time = time.time()
df_avro \
.select("str_col_5","str_col_7","int_col_2") \
.groupBy("str_col_5","str_col_7") \
.count() \
.orderBy("count") \
.limit(1) \
.show(truncate = False)
end_time = time.time()
print(f"Time taken for query: {end_time - start_time} seconds")

+---------+---------+-----+
|str_col_5|str_col_7|count|
+---------+---------+-----+
|1        |6429997  |1    |
+---------+---------+-----+

Time taken for query: 15.421005725860596 seconds


In [4]:
# Perform aggregation using ORC data
df_orc = spark.read.orc("s3a://mybucket/ten_million_orc2.orc")
start_time = time.time()
df_orc \
.select("str_col_5","str_col_7","int_col_2") \
.groupBy("str_col_5","str_col_7") \
.count() \
.orderBy("count") \
.limit(1) \
.show(truncate = False)
end_time = time.time()
print(f"Time taken for query: {end_time - start_time} seconds")

+---------+---------+-----+
|str_col_5|str_col_7|count|
+---------+---------+-----+
|1        |2906292  |1    |
+---------+---------+-----+

Time taken for query: 13.445027828216553 seconds


In [5]:
# Perform aggregation using Delta data
df_delta = spark.read.format("delta").load("s3a://mybucket/ten_million_delta2.delta")
start_time = time.time()
df_delta \
.select("str_col_5","str_col_7","int_col_2") \
.groupBy("str_col_5","str_col_7") \
.count() \
.orderBy("count") \
.limit(1) \
.show(truncate = False)
end_time = time.time()
print(f"Time taken for query: {end_time - start_time} seconds")

+---------+---------+-----+
|str_col_5|str_col_7|count|
+---------+---------+-----+
|1        |2906292  |1    |
+---------+---------+-----+

Time taken for query: 15.512012004852295 seconds
