In [None]:
from pyspark.sql.functions import *
from pyspark.sql.window import *

# Azure open datasets storage information (this storage is in EastUS, ~50GB of data)
# Leave the SAS token empty
blob_account_name = "azureopendatastorage"
blob_container_name = "nyctlc"
blob_relative_path = "yellow"
blob_sas_token = r""

# Allow SPARK to read from Blob remotely
# If using Synapse Spark with DEP enabled workspace, this will be blocked (so use a workspace with no DEP enabled)
wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)
spark.conf.set(
  'fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),
  blob_sas_token)
print('Source blob path: ' + wasbs_path)

# Target storage location
# Synapse authenticates automatically using the current user to the synapse default adls storage
# If using Databricks or other spark, use a SAS token and set it in spark conf like previous step
adls_account_name = 'your-storage-account'
adls_container_name = 'your-container'
parquet_relative_path = 'nyctlc-nopartition-parquet'
csv_gzip_relative_path = 'nyctlc-nopartition-csv-gzip'
csv_uncompressed_relative_path = 'nyctlc-nopartition-csv'
parquet_adls_path = 'abfss://%s@%s.dfs.core.windows.net/%s/' % (adls_container_name,adls_account_name,parquet_relative_path)
csv_gzip_adls_path = 'abfss://%s@%s.dfs.core.windows.net/%s/' % (adls_container_name,adls_account_name,csv_gzip_relative_path)
csv_uncompressed_adls_path = 'abfss://%s@%s.dfs.core.windows.net/%s/' % (adls_container_name,adls_account_name,csv_uncompressed_relative_path)
print('Target parquet path: ' + parquet_adls_path)
print('Target gzip csv path: ' + csv_gzip_adls_path)
print('Target uncompressed csv path: ' + csv_uncompressed_adls_path)

In [None]:
# SPARK read parquet
df = spark.read.parquet(wasbs_path)

In [None]:
# Generate a new column by combining tpepPickupDateTime and tpepDropoffDateTime
df_with_hashcol=df.withColumn("hashCol", 
                      concat(date_format('tpepPickupDateTime', "yyyyMMddhhmmss") , 
                      date_format('tpepDropoffDateTime', "yyyyMMddhhmmss")))

In [None]:
# This will write data as parquet with no partitions (78GB ~ 499 files ~ 156MB per file)
df_with_hashcol.write.parquet(parquet_adls_path,mode='overwrite')

In [None]:
# This will write data as gzip compressed CSV files (61GB ~ 499 files ~ 122MB per file)
df_with_hashcol.write.csv(csv_gzip_adls_path,mode='overwrite',compression="gzip")

In [None]:
# This will write data as uncompressed CSV files (261GB ~ 499 files ~ 523MB per file)
df_with_hashcol.write.csv(csv_uncompressed_adls_path,mode='overwrite')