In [10]:
%profile "de"

Previous profile: de
Setting new profile to: de


In [12]:
%region "us-east-1"

Previous region: us-east-1
Setting new region to: us-east-1
Reauthenticating Glue client with new region: us-east-1
IAM role has been set to arn:aws:iam::605516946663:role/LakeFormationWorkflowRole. Reauthenticating.
Authenticating with profile=de
glue_role_arn defined by user: arn:aws:iam::605516946663:role/LakeFormationWorkflowRole
Authentication done.
Region is set to: us-east-1


In [14]:
%glue_version 3.0

Setting Glue version to: 3.0


In [16]:
%number_of_workers 2

Previous number of workers: 2
Setting new number of workers to: 2


In [18]:
%iam_role arn:aws:iam::605516946663:role/LakeFormationWorkflowRole

Current iam_role is arn:aws:iam::605516946663:role/LakeFormationWorkflowRole
iam_role has been set to arn:aws:iam::605516946663:role/LakeFormationWorkflowRole.


In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)




In [2]:
# PANDAS
import pandas as pd
from datetime import timedelta
# PYSPARK
from pyspark.sql import SparkSession
from pyspark.sql import functions as F 
spark=SparkSession.builder.appName('spark_session').getOrCreate()




Creating Dataframes

In [5]:
# PANDAS
df_pandas = [['A1', 'B1', 2, '21-12-2021 10:30'], 
      ['A2', 'B2', 4, '21-12-2021 10:40'], 
      ['A3', 'B3', 5, '21-12-2021 11:00']] # Rows
df_pandas = pd.DataFrame(df_pandas, columns = ['A', 'B', 'Value', 'Date_Column'])
# PYSPARK
df_pyspark = spark.createDataFrame(
    [('A1', 'B1', 2, '21-12-2021 10:30'), 
     ('A2', 'B2', 4, '21-12-2021 10:40'), 
     ('A3', 'B3', 5, '21-12-2021 11:00')], # Rows
    ['A', 'B', 'Value', 'Date_Column'] # Columns
)
# NOTE: There are multiple ways of creating dataframes in both the libraries




In [8]:
df_pandas.head()

    A   B  Value       Date_Column
0  A1  B1      2  21-12-2021 10:30
1  A2  B2      4  21-12-2021 10:40
2  A3  B3      5  21-12-2021 11:00


In [11]:
df_pyspark.printSchema()

root
 |-- A: string (nullable = true)
 |-- B: string (nullable = true)
 |-- Value: long (nullable = true)
 |-- Date_Column: string (nullable = true)


In [13]:
df_pyspark.show()

+---+---+-----+----------------+
|  A|  B|Value|     Date_Column|
+---+---+-----+----------------+
| A1| B1|    2|21-12-2021 10:30|
| A2| B2|    4|21-12-2021 10:40|
| A3| B3|    5|21-12-2021 11:00|
+---+---+-----+----------------+


Creating New Columns

In [14]:
# PANDAS - New column with Constant values
df_pandas['C'] = 'New Constant'
# PYSPARK - New column with Constant values
df_pyspark = df_pyspark.withColumn("C", F.lit('New Constant'))
# PANDAS - New Column using existing columns
df_pandas['C'] = df_pandas['A']+df_pandas['B']
# PYSPARK - New Column using existing columns
df_pyspark = df_pyspark.withColumn("C", F.concat("A", "B"))
# NOTE:
# lit() -- used to create constant columns
# concat() -- concatenate columns of dataframe
# withColumn() -- creates a new column




In [15]:
df_pandas.head()

    A   B  Value       Date_Column     C
0  A1  B1      2  21-12-2021 10:30  A1B1
1  A2  B2      4  21-12-2021 10:40  A2B2
2  A3  B3      5  21-12-2021 11:00  A3B3


In [16]:
df_pyspark.show()

+---+---+-----+----------------+----+
|  A|  B|Value|     Date_Column|   C|
+---+---+-----+----------------+----+
| A1| B1|    2|21-12-2021 10:30|A1B1|
| A2| B2|    4|21-12-2021 10:40|A2B2|
| A3| B3|    5|21-12-2021 11:00|A3B3|
+---+---+-----+----------------+----+


Updating Existing Column Data

In [18]:
# PANDAS - Update Column data
df_pandas['Value'] = df_pandas['Value']**2
# PYSPARK - Update Column data
df_pyspark = df_pyspark.withColumn("Value", F.col("Value")**2)




In [19]:
df_pandas.head()

    A   B  Value       Date_Column     C
0  A1  B1      4  21-12-2021 10:30  A1B1
1  A2  B2     16  21-12-2021 10:40  A2B2
2  A3  B3     25  21-12-2021 11:00  A3B3


In [20]:
df_pyspark.show()

+---+---+-----+----------------+----+
|  A|  B|Value|     Date_Column|   C|
+---+---+-----+----------------+----+
| A1| B1|  4.0|21-12-2021 10:30|A1B1|
| A2| B2| 16.0|21-12-2021 10:40|A2B2|
| A3| B3| 25.0|21-12-2021 11:00|A3B3|
+---+---+-----+----------------+----+


Selecting, Filtering Data

In [21]:
# PANDAS - Selecting Columns
new_df_pandas = df_pandas[['B', 'C']]
# PYSPARK - Selecting Columns
new_df_pyspark = df_pyspark.select("B", "C")

# PANDAS - Filtering rows based on condition
new_df_pandas = df_pandas[df_pandas['Value']<5]
# PYSPARK - Filtering rows based on condition
new_df_pyspark = df_pyspark.filter(df_pyspark.Value<5)




In [23]:
new_df_pandas.head()

    A   B  Value       Date_Column     C
0  A1  B1      4  21-12-2021 10:30  A1B1


In [24]:
new_df_pyspark.show()

+---+---+-----+----------------+----+
|  A|  B|Value|     Date_Column|   C|
+---+---+-----+----------------+----+
| A1| B1|  4.0|21-12-2021 10:30|A1B1|
+---+---+-----+----------------+----+


Column Type Transformations

In [27]:
# PANDAS - Convert Column from String to DateTime format
df_pandas['Date_Column'] =  pd.to_datetime(df_pandas['Date_Column'], format='%d-%m-%Y %H:%M')
# PYSPARK - Convert Column from String to Timestamp format
df_pyspark = df_pyspark.withColumn("Date_Column", F.to_timestamp("Date_Column", "dd-MM-yyyy hh:mm"))




In [28]:
df_pandas.head()

    A   B  Value         Date_Column     C
0  A1  B1      4 2021-12-21 10:30:00  A1B1
1  A2  B2     16 2021-12-21 10:40:00  A2B2
2  A3  B3     25 2021-12-21 11:00:00  A3B3


In [29]:
df_pyspark.show()

+---+---+-----+-------------------+----+
|  A|  B|Value|        Date_Column|   C|
+---+---+-----+-------------------+----+
| A1| B1|  4.0|2021-12-21 10:30:00|A1B1|
| A2| B2| 16.0|2021-12-21 10:40:00|A2B2|
| A3| B3| 25.0|2021-12-21 11:00:00|A3B3|
+---+---+-----+-------------------+----+


Rename, Drop Columns

In [30]:
# PANDAS - Rename Columns
df_pandas = df_pandas.rename(columns={'A': 'Col_A', 'B': 'Col_B'})
# PYSPARK - Rename Columns
df_pyspark = df_pyspark.withColumnRenamed("A", "Col_A").withColumnRenamed("B", "Col_B")




In [31]:
df_pandas.head()

  Col_A Col_B  Value         Date_Column     C
0    A1    B1      4 2021-12-21 10:30:00  A1B1
1    A2    B2     16 2021-12-21 10:40:00  A2B2
2    A3    B3     25 2021-12-21 11:00:00  A3B3


In [38]:
df_pyspark.show()

+-----+-----+-----+-------------------+----+
|Col_A|Col_B|Value|        Date_Column|   C|
+-----+-----+-----+-------------------+----+
|   A1|   B1|  4.0|2021-12-21 10:30:00|A1B1|
|   A2|   B2| 16.0|2021-12-21 10:40:00|A2B2|
|   A3|   B3| 25.0|2021-12-21 11:00:00|A3B3|
+-----+-----+-----+-------------------+----+


In [40]:
# PANDAS - Drop Columns
df_pandas = df_pandas.drop(['Col_A', 'Col_B'], axis=1)
# PYSPARK - Drop Columns
df_pyspark = df_pyspark.drop('Col_A', 'Col_B')

KeyError: "['Col_A' 'Col_B'] not found in axis"


In [41]:
df_pandas.head()

   Value         Date_Column     C
0      4 2021-12-21 10:30:00  A1B1
1     16 2021-12-21 10:40:00  A2B2
2     25 2021-12-21 11:00:00  A3B3


In [42]:
df_pyspark.show()

+-----+-----+-----+-------------------+----+
|Col_A|Col_B|Value|        Date_Column|   C|
+-----+-----+-----+-------------------+----+
|   A1|   B1|  4.0|2021-12-21 10:30:00|A1B1|
|   A2|   B2| 16.0|2021-12-21 10:40:00|A2B2|
|   A3|   B3| 25.0|2021-12-21 11:00:00|A3B3|
+-----+-----+-----+-------------------+----+


Melt Dataframes

In [43]:
# PANDAS
df1 = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
                   'B': {0: 1, 1: 3, 2: 5},
                   'C': {0: 2, 1: 4, 2: 6}})




In [44]:
df1.head()

   A  B  C
0  a  1  2
1  b  3  4
2  c  5  6


In [45]:
pd.melt(df1, id_vars=['A'], value_vars=['B', 'C'])

   A variable  value
0  a        B      1
1  b        B      3
2  c        B      5
3  a        C      2
4  b        C      4
5  c        C      6


There is no direct version of pd.melt available in Pyspark (Release 3.0)



In [46]:
# PYSPARK custom melt function
def melt(df, 
         id_vars, 
         value_vars, 
         var_name="Variable", 
         value_name="Value"):
    _vars_and_vals = F.array(*(F.struct(F.lit(c).alias(var_name),
                   F.col(c).alias(value_name)) for c in value_vars))
    _tmp = df.withColumn("_vars_and_vals",
                         F.explode(_vars_and_vals))
    cols = id_vars + [
            F.col("_vars_and_vals")[x].alias(x) for x in [var_name,
            value_name]]
    return _tmp.select(*cols)




In [48]:
df = spark.createDataFrame(
    [('a', 1, 2), ('b', 3, 4), ('c', 5, 6)], # Rows
    ['A', 'B', 'C'] # Columns
)
melt(df, ['A'], ['B', 'C']).show()

+---+--------+-----+
|  A|Variable|Value|
+---+--------+-----+
|  a|       B|    1|
|  a|       C|    2|
|  b|       B|    3|
|  b|       C|    4|
|  c|       B|    5|
|  c|       C|    6|
+---+--------+-----+


Add Interval to a Timestamp Column (Timedelta)

In [49]:
# PANDAS - Add 'Interval' to 'Start_Time'
df2 = pd.DataFrame([['2021-01-10 10:10:00', '00:05'],
                   ['2021-12-10, 05:30:00', '00:15'],
                   ['2021-11-10 11:40:00', '00:20']], 
                   columns = ['Start_Time','Interval'])
df2['Start_Time'] = pd.to_datetime(df2['Start_Time'])
df2['End_Time'] = df2['Start_Time'] + \
                 pd.to_timedelta(pd.to_datetime(df2['Interval']).dt.\
                 strftime('%H:%M:%S'))




In [50]:
df2.head()

           Start_Time Interval            End_Time
0 2021-01-10 10:10:00    00:05 2021-01-10 10:15:00
1 2021-12-10 05:30:00    00:15 2021-12-10 05:45:00
2 2021-11-10 11:40:00    00:20 2021-11-10 12:00:00


There is no inbuilt method similar to “to_timedelta” in Pyspark till date. This is an alternate way of doing it in Pyspark.

In [51]:
# PYSPARK - Add 'Interval' to 'Start_Time'
df = spark.createDataFrame(
    [['2021-01-10 10:10:00', '00:05'], 
     ['2021-12-10 05:30:00', '00:15'], 
     ['2021-11-10 11:40:00', '00:20']], # Rows
     ['Start_Time', 'Interval'] # Columns
)
df = df.withColumn("Start_Time", 
                   F.to_timestamp("Start_Time", 
                                  "yyyy-MM-dd hh:mm:ss"))
df = df.withColumn("End_Time", (F.unix_timestamp("Start_Time") + F.unix_timestamp("Interval", "HH:mm")).cast('timestamp'))




In [52]:
df.show()

+-------------------+--------+-------------------+
|         Start_Time|Interval|           End_Time|
+-------------------+--------+-------------------+
|2021-01-10 10:10:00|   00:05|2021-01-10 10:15:00|
|2021-12-10 05:30:00|   00:15|2021-12-10 05:45:00|
|2021-11-10 11:40:00|   00:20|2021-11-10 12:00:00|
+-------------------+--------+-------------------+


other syntax

In [53]:
# PANDAS df
df11 = pd.DataFrame({'A': {0: 'a', 1: 'a', 2: 'c'},
                   'B': {0: 1, 1: 1, 2: 5},
                   'C': {0: 2, 1: 4, 2: 6}})
# PYSPARK df
df22 = spark.createDataFrame(
    [('a', 1, 2), ('a', 1, 4), ('c', 5, 6)], # Rows
    ['A', 'B', 'C'] # Columns
)




In [54]:
# PANDAS - Shape of dataframe
print(df11.shape)
# PYSPARK - Shape of dataframe
print((df22.count(), len(df22.columns)))

(3, 3)
(3, 3)


In [55]:
# PANDAS - Distinct Values of a Column
df11['A'].unique()
# PYSPARK - Distinct Values of a Column
df22.select('A').distinct()

DataFrame[A: string]


In [56]:
# PANDAS - Group by Columns - Calculate Aggregate functions
df11.groupby(['A', 'B']).sum()

     C
A B   
a 1  6
c 5  6


In [58]:
# PYSPARK - Group by Columns - Calculate Aggregate functions
df22.groupBy("A", "B").agg(F.sum("C")).show()

+---+---+------+
|  A|  B|sum(C)|
+---+---+------+
|  a|  1|     6|
|  c|  5|     6|
+---+---+------+


While Pandas is still favourite choice of many data scientists, one can comfortably use it for smaller datasets which do not consume too much memory. Pyspark can be used for larger datasets where distributed computing is used at its best!