### Guidelines

**Note:** This notebook aims to make our life easier and provide quick and nice workable solutions. Feel free to suggest changes or performance improvements. All cells must be "self executable" in order to isolate the dependencies.

In [0]:
from pyspark.sql.functions import col, lit, dense_rank, row_number
from pyspark.sql.window import Window

"""
This optimization technique aims to use Window to create an index and filter the rows you need instead performing a join to get the data needed. This also may be helpful when you need to remove duplicates, instead of using distinct/drop duplicates you can use the Window to generate an index and pick up the element you want.azure

Why: Window triggers a narrow transformation, whereas distinct / join triggers a wide trasnfromation and consequently a shuffle in the cluster.

If you are new to Narrow x Wide transformations and Spark check this quick explanation:
https://www.linkedin.com/posts/minalbhatkar_bigdata-dataengineer-dataanalytics-activity-7072807039709700096-JRP6/

"""
print("\n")
print("GETTING THE DATA WITHOUT JOINING WITH ITSELF OR OTHER POTENTIAL LOOKUP TABLE")
window = Window.partitionBy("ParentId").orderBy(col("NodeId"))

sample_data = [("A535ersgdf1", ""),
               ("Bgdfgdgdsh","A535ersgdf1"),
               ("C56464dfgdf","A535ersgdf1"),
               ("D54654654df","Bgdfgdgdsh")
              ]

sample_df = spark.createDataFrame(sample_data, ["NodeId","ParentId"])
sample_df.display()

sample_ranked_df = sample_df.withColumn("rank", dense_rank().over(window))
sample_ranked_df.display()

sample_ranked_df.filter("rank == 1").display()

###############################################################################
print("\n")
print("DEALING WITH DUPLICATIONS!")

dup_data =  [  ("A535ersgdf1", ""),
               ("Bgdfgdgdsh","A535ersgdf1"),
               ("Bgdfgdgdsh","A535ersgdf1"),
               ("D54654654df","Bgdfgdgdsh")
            ]

dup_df = spark.createDataFrame(dup_data, ["NodeId","ParentId"])
dup_df.display()

window_dup = Window.orderBy(col("NodeId"))
window_dup2 = Window.partitionBy("NodeId").orderBy(col("rank"))

(dup_df.withColumn("rank", dense_rank().over(window_dup))
       .withColumn("rank2", row_number().over(window_dup2))
       .filter("rank2 == 1")
 ).display()

NodeId,ParentId
A535ersgdf1,
Bgdfgdgdsh,A535ersgdf1
C56464dfgdf,A535ersgdf1
D54654654df,Bgdfgdgdsh


NodeId,ParentId,rank
A535ersgdf1,,1
Bgdfgdgdsh,A535ersgdf1,1
C56464dfgdf,A535ersgdf1,2
D54654654df,Bgdfgdgdsh,1


NodeId,ParentId,rank
A535ersgdf1,,1
Bgdfgdgdsh,A535ersgdf1,1
D54654654df,Bgdfgdgdsh,1


NodeId,ParentId
A535ersgdf1,
Bgdfgdgdsh,A535ersgdf1
Bgdfgdgdsh,A535ersgdf1
D54654654df,Bgdfgdgdsh


NodeId,ParentId,rank,rank2
A535ersgdf1,,1,1
Bgdfgdgdsh,A535ersgdf1,2,1
D54654654df,Bgdfgdgdsh,3,1


In [0]:
from pyspark.sql.functions import explode, map_entries

# Note: Be careful, explode is a costly function

df_explode_test = spark.read.format("delta").load("SOME DELTA LAKE PATH")

df_with_array = df_explode_test.withColumn("my_map_col_array", map_entries(F.col("productJSON")))
#df_with_array.display()

exploded_df = df_with_array.withColumn("exploded_map", explode(F.col("my_map_col_array")))
#exploded_df.display()


exploded_df_final = (exploded_df.withColumn("key", F.col("exploded_map").getItem("key"))
                                .withColumn("value", F.col("exploded_map").getItem("value"))
                                .drop("exploded_map", "my_map_col_array")
                    )

exploded_df_final.display()   

# Other Example with in-memory data
data = [(1, {"a": 10, "b": 20}),
        (2, {"c": 30, "d": 40}),
        (3, {"e": 50, "f": 60})]

# Create a DataFrame
df = spark.createDataFrame(data, ["id", "data"])

df.display()

# Use map_entries to convert the "data" map column to an array of structs
df_with_entries = df.withColumn("entries", F.map_entries(col("data")))
df_with_entries.display()

exploded_df = df_with_entries.withColumn("exploded_map", F.explode(F.col("entries")))
exploded_df.display()

exploded_df_final = (exploded_df.withColumn("key", F.col("exploded_map").getItem("key"))
                                .withColumn("value", F.col("exploded_map").getItem("value"))
                                .drop("exploded_map", "entries")
                    )
exploded_df_final.display()

In [0]:
from pyspark.sql.functions import expr, map_from_arrays, map_values, map_keys, col, lit

# Sample data with a MapType column "data"
data = [(1, {"a": 10, "b": 20}),
        (2, {"c": 30, "d": 40}),
        (3, {"e": 50, "f": 60})]

# Create a DataFrame
df = spark.createDataFrame(data, ["id", "data"])
df.display()

(df
.withColumn("values", map_values(col("data")))
.withColumn("keys", map_keys(col("data")))
.withColumn("values_transformed", expr("transform(values, v -> v/10)"))
.withColumn("data_map_transformed", map_from_arrays(col("keys"), col("values_transformed")))
).display()

id,data
1,"Map(a -> 10, b -> 20)"
2,"Map(d -> 40, c -> 30)"
3,"Map(e -> 50, f -> 60)"


id,data,values,keys,values_transformed,data_map_transformed
1,"Map(a -> 10, b -> 20)","List(10, 20)","List(a, b)","List(1.0, 2.0)","Map(a -> 1.0, b -> 2.0)"
2,"Map(d -> 40, c -> 30)","List(40, 30)","List(d, c)","List(4.0, 3.0)","Map(d -> 4.0, c -> 3.0)"
3,"Map(e -> 50, f -> 60)","List(50, 60)","List(e, f)","List(5.0, 6.0)","Map(e -> 5.0, f -> 6.0)"


In [0]:
"""
pyspark.sql.DataFrame.transform() is used to chain the custom transformations and this function returns the new DataFrame after applying the specified transformations.

Arguments:
function to be applied to the dataframe + other values to be considered by the function passed
"""
from pyspark.sql.functions import upper, col, lit

simpleData = (("Java",4000, 5), 
              ("Python", 4600, 10),  
              ("Scala", 4100, 15),   
              ("Scala", 4500, 15),   
              ("PHP", 3000, 20),  
             )

columns= ["CourseName", "Fee", "Discount"]

# Create DataFrame
df = spark.createDataFrame(data = simpleData, schema = columns)
df.display()



# Transformation Pipeline
# Custom transformation 1
def to_upper_str_columns(df):
    return df.withColumn("CourseName", upper(col("CourseName")))

# Custom transformation 2
def reduce_price(reduceBy):
    def transformation(df):
        return df.withColumn("new_fee", col("fee") - lit(reduceBy))
    return transformation

# Custom transformation 3
def apply_discount(df):
    return df.withColumn("discounted_fee", col("new_fee") - (col("new_fee") * col("discount")) / 100)

df2 = df.transform(to_upper_str_columns)
df2 = df2.transform(reduce_price(1000)) # Adjust the reduction amount as needed
df2 = df2.transform(apply_discount)

df2.display()

CourseName,Fee,Discount
Java,4000,5
Python,4600,10
Scala,4100,15
Scala,4500,15
PHP,3000,20


CourseName,Fee,Discount,new_fee,discounted_fee
JAVA,4000,5,3000,2850.0
PYTHON,4600,10,3600,3240.0
SCALA,4100,15,3100,2635.0
SCALA,4500,15,3500,2975.0
PHP,3000,20,2000,1600.0


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

def implode_data(df_for_imploding_df):
"""
Receives a dataframe with exploded columns, picks a schema to be based on (line 11) and apply collect_list
"""
    window = Window.paritionBy("TableId").orderBy("ComplexTypeColumn.NestedColumnToOrder", "ComplexTypeColumn.OtherId") # Configure as the nature of your data

    rank_window = Window.paritionBy("TableId").orderBy(desc("ComplexTypeColumn.NestedColumnToOrder")) # Configure as the nature of your data

    assembled_df = (df_for_imploding_df.select("*", struct(complextype_col_struct()).alias("ComplexTypeColumn"))
                                       .drop(*receipt_line_struct())
                                       .withColumn("ComplexTypeColumn", collect_list("ComplexTypeColumn").over(window))
                                       .withColumn("Rank", row_number().over(rank_window))
                                       .filter("Rank == 1")
                                       .drop("Rank")

    )

    #complextype_col_struct() --> Nested Schema 

    return assembled_df

In [0]:
from pyspark.sql.functions import lit, col, struct, collect_list, row_number
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Sample data
data = [
    (1, "A", 1.0, "hsua-df"),
    (2, "B", 2.0, "ahs-dkf")    
]

# Define the schema
schema = StructType([
         StructField("id", IntegerType(), True),
         StructField("item", StringType(), True),
         StructField("precision", StringType(), True),
         StructField("hash", StringType(), True)
])

def complex_schema():
    return StructType([
         StructField("item", StringType(), True),
         StructField("precision", StringType(), True),
         StructField("hash", StringType(), True)
    ])

# Create the DataFrame
df_for_imploding_df = spark.createDataFrame(data, schema)
df_for_imploding_df.display()

# Define the window specification
window_spec = Window().partitionBy("id")
sublevel_rank_window = Window.orderBy("id")

# Perform the operations
assembled_df = (df_for_imploding_df
    .withColumn("ComplexTypeColumn", struct("item", "precision", "hash"))
    .withColumn("ComplexTypeList", collect_list("ComplexTypeColumn").over(window_spec))
    .withColumn("Rank", row_number().over(sublevel_rank_window)) # --> Filter this rank if you have repetition in the "id"
)
# Show the resulting DataFrame
assembled_df.display()

id,item,precision,hash
1,A,1.0,hsua-df
2,B,2.0,ahs-dkf


id,item,precision,hash,ComplexTypeColumn,ComplexTypeList,Rank
1,A,1.0,hsua-df,"List(A, 1.0, hsua-df)","List(List(A, 1.0, hsua-df))",1
2,B,2.0,ahs-dkf,"List(B, 2.0, ahs-dkf)","List(List(B, 2.0, ahs-dkf))",2


In [0]:
x = complex_schema()
list(x)

In [0]:
import os
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, wait

def summing_test(item):

    return item[0] + item[1]


numbers_to_sum = [(1,2), (3,4), (5,6), (7,8)]


print("Num of CPUs available", multiprocessing.cpu_count())

no_of_products_to_run_in_parallel= 4 

pool = ThreadPoolExecutor(no_of_products_to_run_in_parallel)

results = pool.map(summing_test, numbers_to_sum) # does not block

for res in results:
    print(res) # print results as they become available

pool.shutdown()

In [0]:
from pyspark.sql.types import TimestampType

@udf(TimestampType())
def ticks_to_datetime(ticks):
  try:
    return datetime.datetime.fromtimestamp(float(ticks)/1000)
  except Exception:
    return ticks

In [0]:
from pyspark.sql.types import MapType, StringType

@udf(MapType(StringType(),StringType()))
def convert_string_to_json(input_string):

  key_value_pairs = {}

  if input_string is not None:

    split_string = [tuple(x.split(':')) for x in input_string.split(';') if len(tuple(x.split(':'))) == 2]

    for k,v in split_string:
        key_value_pairs[k] = v

  return key_value_pairs

In [0]:
def unpivot(df, key_columns):

  columns_to_pivot = [column for column in df.columns if column not in key_columns]
  list_of_alias_columns_for_stack = []
  
  for column in columns_to_pivot:
    list_of_alias_columns_for_stack.append(f"'{column}',cast({column} AS STRING)")
  str_of_alias_columns_for_stack = ",".join(list_of_alias_columns_for_stack)
  
  return (df.selectExpr(*key_columns, f"stack({len(columns_to_pivot)}, {str_of_alias_columns_for_stack}) as (Name, Value)"))

In [0]:
"""
This code extracts the power of workers + driver to write CSVs besides to use a for loop to save file by file

You can use the thread pool executor for an extra layer of paralellism (1 write per thread)
"""

from pyspark.sql.functions import col

# Sample data
data = [("apple", 1), ("orange", 2), ("banana", 3), ("apple", 4), ("apple", 5), ("orange", 6)]
df_fruits = spark.createDataFrame(data, ["fruit", "id"])

# Define queries
queries = {}
queries["1"] = "apple"
queries["2"] = "orange"

def filter_and_write_dataframe(df, query, delimiter=",", header=True, mode="overwrite"):
    # Filter the DataFrame based on the query value
    filtered_df = df.filter(col("fruit") == query)
    
    # Write the filtered DataFrame to CSV
    filtered_df.write.csv(path="Your-Path-Here" + query, sep=delimiter, header=header, mode=mode)

# Create separate DataFrames for each query
dfs = {query: df_fruits.filter(col("fruit") == query) for query in queries.values()}

# Apply the function to each DataFrame
for query, df in dfs.items():
    filter_and_write_dataframe(df, query)

In [0]:
def create_dataframe_from_dict(list_of_dict_rows, schema, enforce_none_policy=True):
  
  final_list_of_rows = []
  
  for dict_row in list_of_dict_rows:
    row = []
    for column in schema.names:
      if enforce_none_policy and not schema[column].nullable:
        assert column in dict_row, f"The enforce_none_policy is True and the column {column} doesn't accept nulls"
      row.append(list_of_dict_rows.get(column, None)) # insert None if doesn't exist
    final_list_of_rows.append(row)
  
  return spark.createDataFrame(final_list_of_rows, schema)

In [0]:
from pyspark.sql.types import MapType, StringType

@udf(MapType(StringType(),StringType()))
def convert_string_to_json(input_string):

  key_value_pairs = {}

  if input_string is not None:

    split_string = [tuple(x.split(':')) for x in input_string.split(';') if len(tuple(x.split(':'))) == 2]

    for k,v in split_string:
        key_value_pairs[k] = v

  return key_value_pairs

In [0]:
from pyspark.sql.types import MapType, StringType

@udf(MapType(StringType(),StringType()))
def convert_string_to_json_for_target_weights(input_string):

  return json.loads(input_string)

In [0]:
from pyspark.sql.functions import col, lit, current_timestamp
import pandas as pd 

# Check duplicates easily
data \
    .groupby(['Id']) \
    .count() \
    .where('count > 1') \
    .sort('count', ascending= False) \
    .show()

# Select distinct values from given column 	
df_demog = read_demog_updates(running_date)
display(df_demog.select("Name").distinct())

# Check NULL / NOT NULL values
df.where(col("dt_mvmt").isNull())
df.where(col("dt_mvmt").isNotNull())

# Drop Null values
df.na.drop(subset=["dt_mvmt"])

# When with isIn 
display(almost_result.withColumn("Lifestage", 
                           when(col("NumberOfChildren").isin("0","1","2","3","4"), "Young Family 0-4 Years")
                          .when(col("NumberOfChildren").isin("5","6","7","8","9"), "Middle Family 5-9 Years")
                          .when(col("NumberOfChildren").isin("10","11","12","13","14","15"), "Family 10+ Years")
                          .otherwise("Unknown")
             ))		 
			 
# Pyspark between
test_df.filter(col("start").between(pd.to_datetime('2023-04-13'), pd.to_datetime('2023-04-14'))).show()

# Import stuffs
myTest =(spark.read.format("csv")
            .schema(myschema())
            .option("header", "true")
            .option("delimiter", "\ua746")
            .load("your-path"))

# Group by columns aggregating the value and renaming the aggregation			
display(test__df.groupby("Id1","Id2", "Name").agg(min(col("Value").cast(IntegerType()))).withColumnRenamed("min(CAST(Value AS INT))", "minAge"))


#### MERGE EXAMPLE!!! 
time_now = current_timestamp()
deltaTable = DeltaTable.forPath(spark, "your-delta-table-path")

deltaTable.alias("tableA").merge(
    updatesDF_usis.alias("updates"),
    "tableA.Id = updates.Id") \
  .whenMatchedUpdate(set = {'Id': 'updates.Id',
                            'RecipeGroup': 'updates.RecipeGroup',
                            'ShopId': 'updates.ShopId',
                            'Location': 'updates.Location',
                            'Address': 'updates.Address',
                            'Postcode': 'updates.Postcode',
                            'siteURL': 'updates.siteURL',
                            'UpdatedBy': 'updates.UpdatedBy',
                            'CreatedDateTime': 'usis.CreatedDateTime',
                            'UpdatedDateTime': lit(time_now),
                            'CreatedBy': 'usis.CreatedBy',
                            }
) \
  .whenNotMatchedInsert(values = {'Id': 'updates.Id',
                            'RecipeGroup': 'updates.RecipeGroup',
                            'ShopId': 'updates.ShopId',
                            'Location': 'updates.Location',
                            'Address': 'updates.Address',
                            'Postcode': 'updates.Postcode',
                            'siteURL': 'updates.siteURL',
                            'UpdatedBy': 'updates.UpdatedBy',
                            'CreatedDateTime': lit(time_now),
                            'UpdatedDateTime': lit(time_now),
                            'CreatedBy': 'usis.CreatedBy',
                                 }
) \
  .execute()

In [0]:
from pyspark.sql.functions import upper
from pyspark.sql.types import StringType 

"""
UDF (a.k.a User Defined Function) is the most useful feature of Spark SQL & DataFrame that is used to extend the PySpark build in capabilities. You can use with Select, WithColumn, and SQL

Note: UDF’s are the most expensive operations hence use them only you have no choice and when essential logic in spark would be overcomplicated. They are distributable but work like black box and you may face serialization problems when running with a cluster with workers

Pros: You can easily expand your logic

Cons: Catalyst Optimizer does not optimize the code very well. Built-in pyspark functions are more efficient and parallizable friendly

"""
# UDFs return string by default
@udf(returnType=StringType()) 
def upperCase(myString):
    return myString.upper()


columns = ["SeqNo", "Name"]

data = [("1", "john jones"),
        ("2", "tracey smith"),
        ("3", "amy sanders")]

df = spark.createDataFrame(data=data, schema=columns)
df.display()

df2 = df.withColumn("NameUpperCase", upperCase(col("Name")))
df2.display()

SeqNo,Name
1,john jones
2,tracey smith
3,amy sanders


SeqNo,Name,NameUpperCase
1,john jones,JOHN JONES
2,tracey smith,TRACEY SMITH
3,amy sanders,AMY SANDERS


In [0]:
import pyspark.sql.types as T

def product_schema():
    return T.StructType([
        T.StructField("productID", T.StringType(), True), 
        T.StructField("productName", T.StringType(), True), 
        T.StructField("Description", T.StringType(), True),
        T.StructField("SKUID", T.StringType(), True),
        T.StructField("batchInputWeight", T.FloatType(), True),
        T.StructField("isProductLocked", T.BooleanType(), True),
        T.StructField("netWeight", T.FloatType(), True),
        T.StructField("GrossWeightFactor", T.FloatType(), True),
        T.StructField("servingSize", T.FloatType(), True),
        T.StructField("ltmSalesQuantity", T.IntegerType(), True), 
        T.StructField("ltmPurchaseQuantity", T.IntegerType(), True),
        T.StructField("countryCode", T.StringType(), True), 
        T.StructField("site", T.StringType(), True), 
        T.StructField("certifiedImpacts", T.StringType(), True), 
        T.StructField("certifiedStageImpacts", T.StringType(), True), #
        T.StructField("isCalculated", T.StringType(), True),
        T.StructField("ingredientCollections", T.MapType(T.StringType(), T.StringType()), True), 
        T.StructField("salesActivities", T.StringType(),True),
        T.StructField("processingActivities", T.StringType(), True),
        T.StructField("packagingActivities", T.StringType(), True),
        T.StructField("storageActivities", T.StringType(), True),
        T.StructField("transportActivities", T.StringType(), True),
        T.StructField("commentary", T.StringType(), True),
        T.StructField("recipeCommentary", T.StringType(), True),
        T.StructField("saType", T.StringType(), True),
        T.StructField("taxonomyCode", T.StringType(), True),
        T.StructField("taxonomyL1Code", T.StringType(), True),
        T.StructField("taxonomyL2Code", T.StringType(), True),
        T.StructField("specificationVersion", T.FloatType(), True),
        T.StructField("categories", T.StringType(), True) 
        ]
    )

In [0]:
from pyspark.sql.functions import sha2

def pseudonymize_column(df, column_name, hash_length=64):
    """
    Pseudonymize a column in a DataFrame using a hash function.

    Parameters:
        df (pyspark.sql.DataFrame): The input DataFrame.
        column_name (str): The name of the column to be pseudonymized.
        hash_length (int, optional): The length of the hashed output in characters. Default is 64.

    Returns:
        pyspark.sql.DataFrame: A new DataFrame with the pseudonymized column.
    """
    # Using SHA-256 as the hash function, you can choose other hash functions if needed.
    # Here, we use the sha2 function from PySpark to calculate the hash.
    df_pseudonymized = df.withColumn(column_name + "_pseudonymized", sha2(df[column_name], hash_length))

    # Drop the original column if desired.
    df_pseudonymized = df_pseudonymized.drop(column_name)

    # Optionally, you can rename the pseudonymized column to the original column name.
    df_pseudonymized = df_pseudonymized.withColumnRenamed(column_name + "_pseudonymized", column_name)

    return df_pseudonymized

# Example usage:
if __name__ == "__main__":
    spark = SparkSession.builder.appName("Pseudonymization").getOrCreate()

    # Sample data for demonstration purposes.
    data = [("John Doe", 30), ("Jane Smith", 25), ("Bob Johnson", 35)]
    columns = ["name", "age"]

    df = spark.createDataFrame(data, columns)

    # Pseudonymize the 'name' column using the pseudonymize_column function.
    df_pseudonymized = pseudonymize_column(df, "name")

    df_pseudonymized.show()


Some of them are automatically enabled in the version 12.2 LTS of databricks, but it's good to be explicit

- spark.sql.adaptive.enabled true
- spark.databricks.delta.optimizeWrite.enabled true
- spark.sql.adaptive.coalescePartitions.enabled true
- spark.sql.adaptive.skewJoin.enabled true
- spark.databricks.io.cache.enabled true
- spark.databricks.delta.autoCompact.enabled true

- spark.sql.shuffle.partitions auto  
  (200 is the default --> If you data is small, reduce this number otherwise make it bigger - you need to play with numbers to fine tune. By assigning auto you let spark decides what may be good for general purpose workloads or commom ETLs)


When using pandas workloads (toPandas etc) you can leverage the PyArrow serializtion for more efficiency and performance (make sure PyArrow is installed)

- spark.sql.execution.arrow.pyspark.enabled true
- spark.sql.execution.arrow.pyspark.fallback.enabled

Hints: