#### Source: [this article](https://medium.com/@bhanusree.balisetty/from-pandas-to-pyspark-e7188c8276e)

In [1]:
#!pip install pyspark arrow

In [2]:
import pandas as pd
from datetime import timedelta
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [3]:
spark = SparkSession.builder.appName('spark_session').getOrCreate()

22/06/08 00:26:17 WARN Utils: Your hostname, vienna resolves to a loopback address: 127.0.1.1; using 10.0.0.32 instead (on interface enp3s0)
22/06/08 00:26:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/08 00:26:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Creating Dataframes

In [4]:
# PANDAS
df1 = [['A1', 'B1', 2, '21-12-2021 10:30'], 
       ['A2', 'B2', 4, '21-12-2021 10:40'], 
       ['A3', 'B3', 5, '21-12-2021 11:00']] 

df1 = pd.DataFrame(df1, columns = ['A', 'B', 'Value', 'Date_Column'])



# PYSPARK
df2 = spark.createDataFrame([('A1', 'B1', 2, '21-12-2021 10:30'),
                            ('A2', 'B2', 4, '21-12-2021 10:40'),
                            ('A3', 'B3', 5, '21-12-2021 11:00')],
                            ['A', 'B', 'Value', 'Date_Column'])


#### Creating New Columns

In [5]:
# PANDAS - New column with Constant values
df1['C'] = 'New Constant'

# PYSPARK - New column with Constant values
df2 = df2.withColumn("C", F.lit('New Constant'))

# PANDAS - New Column using existing columns
df1['C'] = df1['A'] + df1['B']

# PYSPARK - New Column using existing columns
df2 = df2.withColumn("C", F.concat("A", "B"))

#NOTE:
#lit() -- used to create constant columns
#concat() -- concatenate columns of dataframe
#withColumn() -- creates a new column

#### Updating Existing Column Data

In [6]:
# PANDAS - Update Column data
df1['Value'] = df1['Value']**2

# PYSPARK - Update Column data
df2 = df2.withColumn("Value", F.col("Value")**2)

#### Selecting, Filtering Data

In [7]:
# PANDAS - Selecting Columns
new_df = df1[['B', 'C']]

# PYSPARK - Selecting Columns
new_df1 = df2.select("B", "C")

# PANDAS - Filtering rows based on condition
new_df1 = df1[df1['Value']<5]

# PYSPARK - Filtering rows based on condition
new_df2 = df2.filter(df2.Value<5)

#### Column Type Transformations

In [8]:
# PANDAS - Convert Column from String to DateTime format
df1['Date_Column'] =  pd.to_datetime(df1['Date_Column'], format='%d-%m-%Y %H:%M')

# PYSPARK - Convert Column from String to Timestamp format
df2 = df2.withColumn("Date_Column", F.to_timestamp("Date_Column", "dd-MM-yyyy hh:mm"))

#### Rename, Drop Columns

In [9]:
# PANDAS - Rename Columns
df1 = df1.rename(columns={'A': 'Col_A', 'B': 'Col_B'})

# PYSPARK - Rename Columns
df2 = df2.withColumnRenamed("A", "Col_A").withColumnRenamed("B", "Col_B")

# PANDAS - Drop Columns
df1 = df1.drop(['Col_A', 'Col_B'], axis=1)

# PYSPARK - Drop Columns
df2 = df2.drop('A', 'B')

#### Melt Dataframes

In [10]:
# PANDAS
df1 = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
                    'B': {0: 1, 1: 3, 2: 5},
                    'C': {0: 2, 1: 4, 2: 6}})

pd.melt(df1, id_vars=['A'], value_vars=['B', 'C'])

Unnamed: 0,A,variable,value
0,a,B,1
1,b,B,3
2,c,B,5
3,a,C,2
4,b,C,4
5,c,C,6


In [12]:
# PYSPARK custom melt function
def melt(df, id_vars, value_vars, var_name="Variable", value_name="Value"):
    _vars_and_vals = F.array(*(F.struct(F.lit(c).alias(var_name),
                                        F.col(c).alias(value_name)) for c in value_vars))
    _tmp = df.withColumn("_vars_and_vals",
                         F.explode(_vars_and_vals))
    cols = id_vars + [F.col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)


df2 = spark.createDataFrame([('a', 1, 2), ('b', 3, 4), ('c', 5, 6)], ['A', 'B', 'C'])

#melt(df2, ['A'], ['B', 'C']).display()

#### Add Interval to a Timestamp Column (Timedelta)

In [13]:
# PANDAS - Add 'Interval' to 'Start_Time'
df1 = pd.DataFrame([['2021-01-10 10:10:00', '00:05'],
                    ['2021-12-10, 05:30:00', '00:15'],
                    ['2021-11-10 11:40:00', '00:20']], 
                   columns = ['Start_Time','Interval'])

df1['Start_Time'] = pd.to_datetime(df1['Start_Time'])
df1['End_Time'] = df1['Start_Time'] + pd.to_timedelta(pd.to_datetime(df1['Interval']).dt.strftime('%H:%M:%S'))

In [14]:
# PYSPARK - Add 'Interval' to 'Start_Time'
df2 = spark.createDataFrame([['2021-01-10 10:10:00', '00:05'], 
                            ['2021-12-10 05:30:00', '00:15'], 
                            ['2021-11-10 11:40:00', '00:20']], 
                           ['Start_Time', 'Interval'])

df2 = df2.withColumn("Start_Time", F.to_timestamp("Start_Time", "yyyy-MM-dd hh:mm:ss"))
df2 = df2.withColumn("End_Time", (F.unix_timestamp("Start_Time") + F.unix_timestamp("Interval", "HH:mm")).cast('timestamp'))

#### Additional Syntax

In [15]:
# PANDAS df
df1 = pd.DataFrame({'A': {0: 'a', 1: 'a', 2: 'c'},
                    'B': {0: 1, 1: 1, 2: 5},
                    'C': {0: 2, 1: 4, 2: 6}})

# PYSPARK df
df2 = spark.createDataFrame([('a', 1, 2), ('a', 1, 4), ('c', 5, 6)],
                            ['A', 'B', 'C'])

# PANDAS - Shape of dataframe
print(df1.shape)

# PYSPARK - Shape of dataframe
print((df2.count(), len(df2.columns)))


# PANDAS - Distinct Values of a Column
df1['A'].unique()

# PYSPARK - Distinct Values of a Column
df2.select('A').distinct()

# PANDAS - Group by Columns - Calculate Aggregate functions
df1.groupby(['A', 'B']).sum()

# PYSPARK - Group by Columns - Calculate Aggregate functions
df2.groupBy("A", "B").agg(F.sum("C"))

(3, 3)


[Stage 0:>                                                          (0 + 8) / 8]

(3, 3)


                                                                                

DataFrame[A: string, B: bigint, sum(C): bigint]