In [1]:
import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

In [2]:
from os.path import abspath
from pyspark.sql import SparkSession

# warehouse_location
warehouse_location = abspath('spark-warehouse')

# Create spark session with hive enabled
spark = SparkSession\
    .builder\
    .master("local[*]")\
    .appName("Spark_App")\
    .config("spark.sql.warehouse.dir", warehouse_location)\
    .enableHiveSupport()\
    .getOrCreate()

In [3]:
import pandas as pd

import pyspark.pandas as ps

from IPython.display import display

In [4]:
# Create a Pandas-on-Spark DataFrame

data = {
    'Name': [f'Person {i}' for i in range(1, 21)],  # Names from 'Person 1' to 'Person 20'
    'Age': [20 + i for i in range(20)],  # Ages from 20 to 39
    'City': ['City ' + chr(65 + i % 26) for i in range(20)]  # Cities from 'City A' to 'City T'
}

psdf = ps.DataFrame(data)
display(psdf.head())

# Operations (similar to Pandas)
display(psdf.describe())
print(psdf['Age'].mean())

Unnamed: 0,Name,Age,City
0,Person 1,20,City A
1,Person 2,21,City B
2,Person 3,22,City C
3,Person 4,23,City D
4,Person 5,24,City E


Unnamed: 0,Age
count,20.0
mean,29.5
std,5.91608
min,20.0
25%,24.0
50%,29.0
75%,34.0
max,39.0


29.5


In [5]:
# Filtering rows
filtered_df = psdf[psdf['Age'] > 25]
display(filtered_df.head())

filtered_df = psdf.query('Age > 25')
display(filtered_df.head())

# Adding new columns
new_columns = psdf
new_columns['Age_x_2'] = new_columns['Age'] * 2
display(new_columns.head())

Unnamed: 0,Name,Age,City
6,Person 7,26,City G
7,Person 8,27,City H
8,Person 9,28,City I
9,Person 10,29,City J
10,Person 11,30,City K


Unnamed: 0,Name,Age,City
6,Person 7,26,City G
7,Person 8,27,City H
8,Person 9,28,City I
9,Person 10,29,City J
10,Person 11,30,City K


Unnamed: 0,Name,Age,City,Age_x_2
0,Person 1,20,City A,40
1,Person 2,21,City B,42
2,Person 3,22,City C,44
3,Person 4,23,City D,46
4,Person 5,24,City E,48


In [6]:
# Applying functions
functions_df = psdf

def add_one(x):
    return x + 1

functions_df['Age_plus_one'] = functions_df['Age'].apply(add_one)
display(functions_df.head())

# Using lambda functions
functions_df['Age_squared'] = functions_df['Age'].apply(lambda x: x**2)
display(functions_df.head())

Unnamed: 0,Name,Age,City,Age_x_2,Age_plus_one
0,Person 1,20,City A,40,21
1,Person 2,21,City B,42,22
2,Person 3,22,City C,44,23
3,Person 4,23,City D,46,24
4,Person 5,24,City E,48,25


Unnamed: 0,Name,Age,City,Age_x_2,Age_plus_one,Age_squared
0,Person 1,20,City A,40,21,400
1,Person 2,21,City B,42,22,441
2,Person 3,22,City C,44,23,484
3,Person 4,23,City D,46,24,529
4,Person 5,24,City E,48,25,576


In [7]:
# Define a custom function
def square(x):
    return x * x

# Apply function
psdf['Age_squared'] = psdf['Age'].map(square)
display(psdf.head())

Unnamed: 0,Name,Age,City,Age_x_2,Age_plus_one,Age_squared
0,Person 1,20,City A,40,21,400
1,Person 2,21,City B,42,22,441
2,Person 3,22,City C,44,23,484
3,Person 4,23,City D,46,24,529
4,Person 5,24,City E,48,25,576


In [8]:
# Sorting
sorted_df = psdf.sort_values(by='Age', ascending=False)
display(sorted_df.head())

Unnamed: 0,Name,Age,City,Age_x_2,Age_plus_one,Age_squared
19,Person 20,39,City T,78,40,1521
18,Person 19,38,City S,76,39,1444
17,Person 18,37,City R,74,38,1369
16,Person 17,36,City Q,72,37,1296
15,Person 16,35,City P,70,36,1225


In [9]:
# Grouping and aggregation
grouped_df = psdf.groupby('Name')['Age'].min()
display(grouped_df.head())

Name
Person 1    20
Person 2    21
Person 3    22
Person 4    23
Person 5    24
Name: Age, dtype: int64

In [10]:
# Joining two DataFrames:
df1 = ps.DataFrame({'id': [1, 2], 'name': ['Alice', 'Bob']})
df2 = ps.DataFrame({'id': [2, 3], 'age': [30, 25]})

merged_df = df1.merge(df2, on='id', how='inner')
display(merged_df.head())

Unnamed: 0,id,name,age
0,2,Bob,30


In [11]:
# Pandas to Pandas-on-Spark

# Creating a Pandas DataFrame
pdf = pd.DataFrame(data)

psdf = ps.from_pandas(pdf)
display(psdf.head())

# Pandas-on-Spark to Pandas
pdf_back = psdf.to_pandas()
display(pdf_back.head())

Unnamed: 0,Name,Age,City
0,Person 1,20,City A
1,Person 2,21,City B
2,Person 3,22,City C
3,Person 4,23,City D
4,Person 5,24,City E




Unnamed: 0,Name,Age,City
0,Person 1,20,City A
1,Person 2,21,City B
2,Person 3,22,City C
3,Person 4,23,City D
4,Person 5,24,City E


In [12]:
# Convert to Spark DataFrame
sdf = psdf.to_spark().filter("Age > 25")
sdf.show()



+---------+---+------+
|     Name|Age|  City|
+---------+---+------+
| Person 7| 26|City G|
| Person 8| 27|City H|
| Person 9| 28|City I|
|Person 10| 29|City J|
|Person 11| 30|City K|
|Person 12| 31|City L|
|Person 13| 32|City M|
|Person 14| 33|City N|
|Person 15| 34|City O|
|Person 16| 35|City P|
|Person 17| 36|City Q|
|Person 18| 37|City R|
|Person 19| 38|City S|
|Person 20| 39|City T|
+---------+---+------+



In [13]:
# Convert Spark DataFrame to Pandas-on-Spark DataFrame
psdf = sdf.pandas_api()
display(psdf.head())

Unnamed: 0,Name,Age,City
0,Person 7,26,City G
1,Person 8,27,City H
2,Person 9,28,City I
3,Person 10,29,City J
4,Person 11,30,City K


In [14]:
# Create a Spark DataFrame

# Create the Pandas DataFrame
pdf = pd.DataFrame(data)

# Convert the Pandas DataFrame to a PySpark DataFrame
sdf = spark.createDataFrame(pdf)

# Convert to Pandas-on-Spark DataFrame
psdf = sdf.pandas_api()

In [15]:
# Convert to PySpark DataFrame for advanced operations
df_spark = psdf.to_spark()
df_spark.createOrReplaceTempView('table')
result = spark.sql('SELECT * FROM table WHERE Age < 27')
result.show()



+--------+---+------+
|    Name|Age|  City|
+--------+---+------+
|Person 1| 20|City A|
|Person 2| 21|City B|
|Person 3| 22|City C|
|Person 4| 23|City D|
|Person 5| 24|City E|
|Person 6| 25|City F|
|Person 7| 26|City G|
+--------+---+------+

