### Start SparkSession

In [None]:
import os, subprocess, sys
print("JAVA_HOME:", os.environ.get("JAVA_HOME"))
print(os.environ.get("HADOOP_HOME"))

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F


spark = (
    SparkSession
    .builder
    .appName("ivs")
    .getOrCreate()
)

print("Spark session:", spark.sparkContext.uiWebUrl)

In [None]:
PATH = 'data/customs_data.csv'

In [None]:
spark.read.csv(PATH).show()

In [None]:
df = spark.read.csv(PATH, sep=';', header=True)

df.show(truncate=False)

df.show(2, False, True)

### PrintSchema

In [None]:
df.printSchema()

In [None]:
result = df\
    .withColumnRenamed("direction_eng", "direction")\
    .withColumnRenamed("measure_eng", "measure")

result.columns

### Select

In [None]:
result.show(4)

In [None]:
result\
    .select('country')\
    .distinct()\
    .show(10, truncate=False)

### GroupBy

In [None]:
(
    result
    .groupBy('country').agg(F.count('*').alias('total_rows'))
    .orderBy(F.col('total_rows').desc())
    .show()
)

### Filter

In [None]:
result.show()

In [None]:
df_de = (
    result
    .where(F.col('country') == 'DE')
    .where(F.col('value').isNotNull())
)

print(df_de.count())

# df_de2 = (
#     result
#     .where(''' country == "DE" ''')
#     .where(''' value IS NOT NULL ''')
# )

# print(df_de.count() == df_de2.count())

### Save to CSV

In [None]:
df_de.columns

In [None]:
final = (
    df_de
    .select(
        'month',
        'country',
        'code',
        'value',
        'netto',
        'quantity',
        'region',
        'district',
        'direction',
        'measure',
        F.col('load_date').cast('date'),
    )
)

final.show(2, truncate=False)

In [None]:
final.show(4)

In [None]:
# ----------------------------------------------------------------------------------
# This block of code demonstrates various methods for saving a DataFrame (final) 
# and controlling the number and structure of the output files.
# ----------------------------------------------------------------------------------

# Uncontrolled saving by file count
# (This will result in as many files as the current number of partitions in 'final' DataFrame)
# (
#     final
#     .write
#     .format('csv')
#     .options(header='True', sep=';')
#     .mode('overwrite')
#     .csv('data/final_no_control')
# )

# partition_num = final.rdd.getNumPartitions()
# print(f'Кол-во партиций {partition_num}') # Output: 'Кол-во партиций [number]'

# # Controlled saving by file count - ONE FILE
# # The coalesce(1) operation reduces the number of partitions to 1, forcing a single output file.
# (
#     final
#     .coalesce(1)
#     .write
#     .format('csv')
#     .options(header='True', sep=';')
#     .csv('data/final_one_file')
# )

# partition_num = final.coalesce(1).rdd.getNumPartitions()
# print(f'Кол-во партиций {partition_num}') # Output: 'Кол-во партиций 1'


# # Saving with Partitioning
# # Creates separate directories based on the distinct values in the 'load_date' column.
# (
#     final
#     .write
#     .partitionBy('load_date')
#     .format('csv')
#     .options(header='True', sep=';')
#     .csv('data/final_partitioned')
# )

# print_df = final.select('load_date').distinct()
# print(f'Load_date distinct: {print_df.count()}') # Output: 'Load_date distinct: [number]'


# Saving with Partitioning and repartition within each partition (Controlled Partitioning)
# The repartition(1, 'load_date') ensures that for each distinct 'load_date', 
# only ONE file is written within its corresponding directory, reducing file sprawl.
(
    final
    .repartition(1, 'load_date')
    .write
    .partitionBy('load_date')
    .format('csv')
    .options(header='True', sep=';')
    .csv('data/final_partitioned_repart')
)

# partition_num = final.repartition(1, 'load_date').rdd.getNumPartitions()
# print(f'Кол-во партиций {partition_num}') # Output: 'Кол-во партиций [number of distinct load_dates]'

### Read Transformed

In [None]:
reader_no_control = (
    spark
    .read
    .csv('data/final_no_control/', header=True, sep=';')
    .where(''' load_date = "2024-01-01" ''')
)

reader_final_one_file = (
    spark
    .read
    .csv('data/final_one_file/', header=True, sep=';')
    .where(''' load_date = "2024-01-01" ''')
)

reader_partitioned = (
    spark
    .read
    .csv('data/final_partitioned', header=True, sep=';')
    .where(''' load_date = "2024-01-01" ''')
)

reader_partitioned_repart = (
    spark
    .read
    .csv('data/final_partitioned_repart', header=True, sep=';')
    .where(''' load_date = "2024-01-01" ''')
)


#reader_no_control.count() # number of files read: 16 | size of files read: 88.4 MiB | 2.5 s (90 ms, 301 ms, 384 ms)

#reader_final_one_file.count() # number of files read: 1 | size of files read: 88.4 MiB | 3.2 s (306 ms, 407 ms, 420 ms )

#reader_partitioned.count() # number of files read: 16 | size of files read: 16.4 MiB | 305 ms (32 ms, 39 ms, 54 ms )

reader_partitioned_repart.count() # number of files read: 1 | size of files read: 16.4 MiB | 179 ms (9 ms, 43 ms, 44 ms )

### JOIN

In [None]:
data = [
    (14000, "Northern"),
    (11000, "Southern"),
    (10000, "Eastern"),
    (26000, "Western"),
    (56000, "Central"),
]
region_df = spark.createDataFrame(data, schema='region_id long, name string')

region_df.show()


customs_data = (
    spark
    .read
    .csv('data/customs_data.csv', header=True, sep=';')
)

customs_data.show(2)

In [None]:
# Turn off Broadcast JOIN
import time
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [None]:
# mo broadcast join

start_time = time.time()

customs_data.join(region_df, customs_data.region==region_df.region_id, "left").count()

end_time = time.time()

print(f"Elapsed time for join operation: {end_time - start_time:.2f} seconds")

In [None]:
# with broadcast join

start_time = time.time()

customs_data.join(F.broadcast(region_df), customs_data.region == region_df.region_id, "left").count()

end_time = time.time()

print(f"Elapsed time for broadcast join operation: {end_time - start_time:.2f} seconds")

### Cache | Persist

In [None]:
customs_data.cache().count()

In [None]:
customs_data.unpersist()

In [None]:
from pyspark.storagelevel import StorageLevel

customs_data.persist(StorageLevel.DISK_ONLY).count()

### Repartition & Coalesce

In [None]:
print(spark.sparkContext.getConf().get("spark.driver.memory"))

In [None]:
data = [
    (1,'one'),
    (2,'two'),
    (3,'three'),
    (4,'four'),
    (5,'five'),
    (6,'six'),
    (7, 'seven'),
    (8, 'eight'),
    (9, 'nine'),
]

df = spark.createDataFrame(data, ['id', 'number'])

df.show()

In [None]:
df.rdd.getNumPartitions()

In [None]:
mix.repartition(3).rdd.glom().collect()

In [None]:
mix.coalesce(2).rdd.glom().collect()

In [None]:
mix.toPandas().head()

In [None]:
# OUT OF MEMORY

d = spark.read.csv('data/customs_data.csv', header=True, sep='\t')
d.collect()

In [None]:
spark.stop()