# PySpark Training Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


####  Run these cells to configure your interactive session

In [None]:
%idle_timeout 30
%glue_version 5.0
%worker_type G.1X
%number_of_workers 4

In [None]:
%%configure
{
    "--enable-continuous-cloudwatch-log": "true",
    "--enable-spark-ui": "true",
    "--spark-event-logs-path": "s3://dip-pyspark-training/spark_ui_tmp/",
    "--enable-metrics": "true",
    "--enable-observability-metrics": "true",
    "--conf": "spark.sql.codegen.comments=true",
    "--conf": "spark.sql.codegen.fallback=true",
    "--conf": "spark.sql.codegen.wholeStage=true",
    "--conf": "spark.sql.ui.explainMode=extended",
    "--conf": "spark.sql.ui.retainedExecutions=100",
    "--conf": "spark.ui.retainedJobs=1000",
    "--conf": "spark.ui.retainedStages=1000",
    "--conf": "spark.ui.retainedTasks=10000",
    "--conf": "spark.ui.showAdditionalMetrics=true"
}

### Start spark session 

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

### Get spark's configuration

In [None]:
dynamic_allocation_enabled = spark.sparkContext.getConf().get('spark.dynamicAllocation.enabled')
dynamic_min_executors = spark.sparkContext.getConf().get('spark.dynamicAllocation.minExecutors')
dynamic_max_executors = spark.sparkContext.getConf().get('spark.dynamicAllocation.maxExecutors')
dynamic_initial_executors = spark.sparkContext.getConf().get('spark.dynamicAllocation.initialExecutors')

executor_instances = spark.sparkContext.getConf().get('spark.executor.instances')
executor_cores = spark.sparkContext.getConf().get('spark.executor.cores')
executor_memory = spark.sparkContext.getConf().get('spark.executor.memory')

driver_cores = spark.sparkContext.getConf().get('spark.driver.cores')
driver_memory = spark.sparkContext.getConf().get('spark.driver.memory')

print(f'''
Dynamic allocation enabled: {dynamic_allocation_enabled}
Dynamic min executors: {dynamic_min_executors}
Dynamic max executors: {dynamic_max_executors}
Dynamic initial executors: {dynamic_initial_executors}
----------------------------------------
Executor instances: {executor_instances}
Executor cores: {executor_cores}
Executor memory: {executor_memory}
----------------------------------------
Driver cores: {driver_cores}
Driver memory: {driver_memory}
''')

### Import libraries

In [None]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
import datetime

### Loading the New York's taxi dataset

In [None]:
# partitioned file
p_df = spark.read.format('parquet').load('s3://dip-pyspark-training/data/big/ny-taxi-dataset-partitioned/')

In [None]:
# p_df.rdd.getNumPartitions()

In [None]:
# Define the data as lists
vendors = ['VTS', 'CMT', 'DDS', 'VTS', 'CMT', 'DDS']
payment_type = ['CASH', 'CASH', 'CASH', 'CREDIT', 'CREDIT', 'CREDIT']
extra_col = ['A', 'B', 'C', 'D', 'E', 'F']

# Define the schema of the dataframe
schema = T.StructType([
    T.StructField("vendor_id", T.StringType(), False),
    T.StructField("payment_type", T.StringType(), False),
    T.StructField("extra_col_from_m", T.StringType(), False)
])

# Create a list of tuples
data = [(vendors[i], payment_type[i], extra_col[i]) for i in range(len(vendors))]

# Create a PySpark dataframe
m_df = spark.createDataFrame(data, schema)
# m_df.show()

In [None]:
# to_join_location = 's3://dip-pyspark-training/data/big/to_join_data/'
# m_df.write.format('parquet').mode('overwrite').save(to_join_location)

In [None]:
# spark_application_id = spark.sparkContext.applicationId.split('-')[-1]
# tmp_table_name = f'{spark_application_id}_tmp_table'
# tmp_table_name

In [None]:
# Register table on the spark catalog
# spark.sql(f"""
# CREATE TABLE {tmp_table_name} (
#     vendor_id STRING,
#     payment_type STRING,
#     extra_col_from_m STRING
# )
# STORED AS PARQUET
# LOCATION '{to_join_location}'
# """)

In [None]:
# Make sure we obtain the metadata needed to fetch the size of this table only
# spark.sql(f'ANALYZE TABLE {tmp_table_name} COMPUTE STATISTICS')

In [None]:
# m_df_from_catalog = spark.sql(f'SELECT * FROM {tmp_table_name}')
# m_df_from_catalog.show()

In [None]:
p_joined_df = p_df.join(other=m_df, how='inner', on = ['vendor_id', 'payment_type'])
# p_joined_df = p_df.join(other=m_df.hint('broadcast'), how='inner', on = ['vendor_id', 'payment_type'])
# p_joined_df = p_df.join(other=m_df_from_catalog, how='inner', on = ['vendor_id', 'payment_type'])
p_joined_df.explain(True)

In [None]:
ts = datetime.datetime.now()
output_file_path_partitioned = 's3://dip-pyspark-training/output/merged-dataset-02/'
p_joined_df.write.format('parquet').mode('overwrite').save(output_file_path_partitioned)
p_pt = (datetime.datetime.now() - ts).seconds
print(f'The processing time was {p_pt} seconds')

In [None]:
# p_tmp_df = spark.read.format('parquet').load(output_file_path_partitioned)
# p_tmp_df.show(5)

In [None]:
# p_tmp_df.count()

In [None]:
# spark.conf.get("spark.sql.autoBroadcastJoinThreshold")

In [None]:
# spark.conf.get("spark.sql.join.preferSortMergeJoin")

In [None]:
# drop the temporaty table
# spark.sql(f'DROP TABLE {tmp_table_name}')