In [2]:
# Set the PySpark environment variables
import os
os.environ['SPARK_HOME'] = "/Users/drewwenturine/Coding/Apps/Spark"
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = 'python'

In [52]:
# Import PySpark
from pyspark.sql import SparkSession
#import lit library
from pyspark.sql.functions import lit
#import concat library
from pyspark.sql.functions import concat
#import coalesce library
from pyspark.sql.functions import coalesce
#import functions library
from pyspark.sql import functions as F

In [4]:
# Create a SparkSession
spark = SparkSession.builder.appName("pyspark-get-started").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/15 10:35:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [46]:
# Create df
data = [("Drew", 12),("Bob", 30),("Charlie", 35),("Woody", 42)]
age_buckets = [(12, "10-20"),(30, "30-40"),(35, "30-40")]
df = spark.createDataFrame(data, ["Name", "Age"])
df_buckets = spark.createDataFrame(age_buckets, ["Age", "Age_Bucket"])

print(type(df))
df.printSchema()
df.show()
print(type(df_buckets))
df_buckets.printSchema()
df_buckets.show()

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)

+-------+---+
|   Name|Age|
+-------+---+
|   Drew| 12|
|    Bob| 30|
|Charlie| 35|
|  Woody| 42|
+-------+---+

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- Age: long (nullable = true)
 |-- Age_Bucket: string (nullable = true)

+---+----------+
|Age|Age_Bucket|
+---+----------+
| 12|     10-20|
| 30|     30-40|
| 35|     30-40|
+---+----------+



In [7]:
# Select
# Glue Method
# df_sel = df.select_fields (["Name"])

# PySpark Method
df_sel = df.select("Name")


df_sel.show()

+-------+
|   Name|
+-------+
|   Drew|
|    Bob|
|Charlie|
+-------+



In [9]:
# Drop
df_drop = df.select("Name", "Age")

#Glue Method
# df_drop.drop_fields(["Name"])

# PySpark Method
df_drop = df_drop.drop("Name")

df_drop.show()

+---+
|Age|
+---+
| 12|
| 30|
| 35|
+---+



In [14]:
# Rename Columns
df_rename = df.select("Name", "Age")

#Glue Method
# mapping = [("Name", "string", "Names", "string"), ("Age", "long", "Age", "long")]

# df_map = ApplyMapping.apply(
# frame = df_rename,
# mappings = mapping,
# trasformation_ctx = 'applymapping1'    
# )

# df_map.show()


# PySpark Method
df_rename = df_rename.withColumnRenamed("Name", "Names")

df_rename.show()

+-------+---+
|  Names|Age|
+-------+---+
|   Drew| 12|
|    Bob| 30|
|Charlie| 35|
+-------+---+



In [41]:
# Filter
df_filter = df.select("Name", "Age")

#Glue Method
#df_filter = Filter.apply(
#frame = df_filter,
#f = lambda x: x["Name" in "Drew"]
#)

# PySpark Method
#df_filter = df_filter.filter(df_filter.Name == "Drew")
df_filter = df_filter.where("Name == 'Drew'")


df_filter.show()

+----+---+
|Name|Age|
+----+---+
|Drew| 12|
+----+---+



In [37]:
# Add Column
# NEED TO HAVE lit library imported

#Glue Method
#not possible with glue method

# PySpark Method
df_new_column = df.select("Name", "Age")

df_new_column = df_new_column.withColumn("Date", lit("2023-08-15"))
df_new_column = df_new_column.withColumn("Name-Age", concat("Name", lit(" - "), "Age"))

df_new_column.show()

+-------+---+----------+------------+
|   Name|Age|      Date|    Name-Age|
+-------+---+----------+------------+
|   Drew| 12|2023-08-15|   Drew - 12|
|    Bob| 30|2023-08-15|    Bob - 30|
|Charlie| 35|2023-08-15|Charlie - 35|
+-------+---+----------+------------+



In [39]:
# Grouping/Aggregate
df_group = df.select("Name", "Age")

# PySpark Method
df_group = df_group.groupBy("Name").count()
#df_group = df_group.groupBy("Name").sum()

df_group.show()

+-------+----------+
|   Name|sum(count)|
+-------+----------+
|   Drew|         1|
|    Bob|         1|
|Charlie|         1|
+-------+----------+



In [63]:
# Join
df_join = df.select("Name", "Age")

#Glue Method
#df_join = df.join(["Age"],["Age"],df_buckets)

#df_join.show()


# PySpark Method
#reading from S3
#new_df = glueContext.create_dynamic_frame.from_catalog(
#    database = "db",
#    table_name = "table"
#).toDF()

#inner
df_inner_join = df_join.join(df_buckets,df_join.Age == df_buckets.Age,"inner")

df_inner_join.show()

#left
df_left_join = df_join.join(df_buckets,df_join.Age == df_buckets.Age,"left")
df_left_join = df_left_join.drop(df_join.Age)

print("Before default values for NULLs")
df_left_join.show()


# 2 ways to fill nulls
df_left_join = df_left_join.withColumn("Age_defaulted", coalesce(F.col("Age"), lit("someDefaultValue")))
df_left_join = df_left_join.fillna( {'Age': -1, 'Age_Bucket': 'unknown'} )

print("After default values for NULLs")
df_left_join.show()




+-------+---+---+----------+
|   Name|Age|Age|Age_Bucket|
+-------+---+---+----------+
|   Drew| 12| 12|     10-20|
|    Bob| 30| 30|     30-40|
|Charlie| 35| 35|     30-40|
+-------+---+---+----------+

Before default values for NULLs
+-------+----+----------+
|   Name| Age|Age_Bucket|
+-------+----+----------+
|   Drew|  12|     10-20|
|    Bob|  30|     30-40|
|Charlie|  35|     30-40|
|  Woody|null|      null|
+-------+----+----------+

After default values for NULLs
+-------+---+----------+----------------+
|   Name|Age|Age_Bucket|   Age_defaulted|
+-------+---+----------+----------------+
|   Drew| 12|     10-20|              12|
|    Bob| 30|     30-40|              30|
|Charlie| 35|     30-40|              35|
|  Woody| -1|   unknown|someDefaultValue|
+-------+---+----------+----------------+



In [None]:
# write spark df to S3

# Import Dynamic DataFrame class
from awsglue.dynamicframe import DynamicFrame

#Convert from Spark Data Frame to Glue Dynamic Frame
dyfCustomersConvert = DynamicFrame.fromDF(sparkDf, glueContext, "convert")

#Show converted Glue Dynamic Frame
dyfCustomersConvert.show()


# write down the data in converted Dynamic Frame to S3 location. 
glueContext.write_dynamic_frame.from_options(
                            frame = dyfCustomersConvert,
                            connection_type="s3", 
                            connection_options = {"path": "s3://<YOUR_S3_BUCKET_NAME>/write_down_dyf_to_s3"}, 
                            format = "csv", 
                            format_options={
                                "separator": ","
                                },
                            transformation_ctx = "datasink2")

# write data from the converted to customers_write_dyf table using the meta data stored in the glue data catalog 
glueContext.write_dynamic_frame.from_catalog(
    frame = dyfCustomersConvert,
    database = "pyspark_tutorial_db",  
    table_name = "customers_write_dyf")

