# PySpark Basics

Welcome to **PySpark Basics**!  
This notebook introduces you to **Apache Spark’s Python API (PySpark)** for large-scale data processing.

You’ll learn how to:
- Initialize a SparkSession  
- Create and manipulate DataFrames & RDDs  
- Apply transformations and actions  
- Perform aggregations and joins  
- Use SQL queries on Spark DataFrames  

---


In [27]:
import os, sys

# ✅ Ensure Spark uses the same Python you're running this notebook with
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

# (Optional) if Spark is installed manually, uncomment and set these:
# os.environ["SPARK_HOME"] = "C:\\spark"
# os.environ["HADOOP_HOME"] = "C:\\hadoop"

print("Using Python:", sys.executable)


Using Python: C:\Users\ksiri\anaconda3\envs\sparkenv\python.exe


In [38]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("PySpark without Hadoop binaries")
    .config("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem")
    .config("spark.hadoop.fs.AbstractFileSystem.file.impl", "org.apache.hadoop.fs.local.LocalFs")
    .config("spark.hadoop.fs.permissions.umask-mode", "000")
    .getOrCreate()
)

print("✅ Spark started successfully!")
print("Spark version:", spark.version)


✅ Spark started successfully!
Spark version: 4.0.1


## Creating DataFrames

A **DataFrame** in PySpark is similar to a Pandas DataFrame — but distributed across a cluster.

You can create DataFrames from:
1. Python lists or dictionaries  
2. External sources like CSV, JSON, Parquet, or databases


In [39]:
from pyspark.sql import Row

data = [
    Row(name="Alice", age=25, city="New York"),
    Row(name="Bob", age=30, city="San Francisco"),
    Row(name="Charlie", age=35, city="Chicago")
]

df = spark.createDataFrame(data)
df.show()


+-------+---+-------------+
|   name|age|         city|
+-------+---+-------------+
|  Alice| 25|     New York|
|    Bob| 30|San Francisco|
|Charlie| 35|      Chicago|
+-------+---+-------------+



## Reading Data from CSV/JSON

PySpark can read data from multiple file formats.  
Here’s how to read a CSV file and infer its schema automatically.


In [40]:
import urllib.request

url = "https://raw.githubusercontent.com/mwaskom/seaborn-data/master/tips.csv"
local_path = "tips.csv"

urllib.request.urlretrieve(url, local_path)
print("✅ File downloaded successfully!")

tips_df = spark.read.csv(local_path, header=True, inferSchema=True)
tips_df.show(5)


✅ File downloaded successfully!
+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows


## DataFrame Operations

You can perform filtering, selecting, sorting, and aggregations on DataFrames easily.

Common functions:
- `select()`
- `filter()` / `where()`
- `groupBy()`
- `orderBy()`


In [41]:
from pyspark.sql.functions import col

tips_df.select("sex", "total_bill", "tip").show(5)

# Filter rows where tip > 5
tips_df.filter(col("tip") > 5).show(5)

# Sort by total_bill
tips_df.orderBy(col("total_bill").desc()).show(5)


+------+----------+----+
|   sex|total_bill| tip|
+------+----------+----+
|Female|     16.99|1.01|
|  Male|     10.34|1.66|
|  Male|     21.01| 3.5|
|  Male|     23.68|3.31|
|Female|     24.59|3.61|
+------+----------+----+
only showing top 5 rows
+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     39.42|7.58|  Male|    No|Sat|Dinner|   4|
|      30.4| 5.6|  Male|    No|Sun|Dinner|   4|
|      32.4| 6.0|  Male|    No|Sun|Dinner|   4|
|     34.81| 5.2|Female|    No|Sun|Dinner|   4|
|     48.27|6.73|  Male|    No|Sat|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows
+----------+----+----+------+---+------+----+
|total_bill| tip| sex|smoker|day|  time|size|
+----------+----+----+------+---+------+----+
|     50.81|10.0|Male|   Yes|Sat|Dinner|   3|
|     48.33| 9.0|Male|    No|Sat|Dinner|   4|
|     48.27|6.73|Male|    No|Sat|Dinner|   4|
|     48.17| 5.0|Ma

## Aggregations

You can group and summarize data using `groupBy()` and aggregate functions like `avg()`, `sum()`, `count()`.


In [42]:
from pyspark.sql.functions import avg, sum, count

tips_df.groupBy("day").agg(
    count("*").alias("total_records"),
    avg("tip").alias("avg_tip"),
    sum("total_bill").alias("total_sales")
).show()


+----+-------------+-----------------+------------------+
| day|total_records|          avg_tip|       total_sales|
+----+-------------+-----------------+------------------+
|Thur|           62|2.771451612903226|1096.3299999999997|
| Sun|           76|3.255131578947369|1627.1600000000003|
| Sat|           87|2.993103448275862|1778.3999999999996|
| Fri|           19|2.734736842105263|325.87999999999994|
+----+-------------+-----------------+------------------+



## Adding and Modifying Columns

Use `withColumn()` to add or modify columns dynamically.


In [43]:
# Add a new column: tip percentage
tips_df = tips_df.withColumn("tip_percent", (col("tip") / col("total_bill")) * 100)
tips_df.select("total_bill", "tip", "tip_percent").show(5)


+----------+----+------------------+
|total_bill| tip|       tip_percent|
+----------+----+------------------+
|     16.99|1.01|5.9446733372572105|
|     10.34|1.66|16.054158607350097|
|     21.01| 3.5|16.658733936220845|
|     23.68|3.31| 13.97804054054054|
|     24.59|3.61|14.680764538430255|
+----------+----+------------------+
only showing top 5 rows


## DataFrame Joins

You can join multiple DataFrames on a common column.


In [44]:
from pyspark.sql import Row

# Create another DataFrame for demonstration
city_data = [
    Row(city="New York", state="NY"),
    Row(city="Chicago", state="IL"),
    Row(city="San Francisco", state="CA")
]
city_df = spark.createDataFrame(city_data)

# Join with tips_df (pretend it has a city column)
joined_df = df.join(city_df, on="city", how="inner")
joined_df.show()


+-------------+-------+---+-----+
|         city|   name|age|state|
+-------------+-------+---+-----+
|      Chicago|Charlie| 35|   IL|
|     New York|  Alice| 25|   NY|
|San Francisco|    Bob| 30|   CA|
+-------------+-------+---+-----+



## Using SQL Queries

You can register a DataFrame as a temporary SQL table and query it using SQL syntax.


In [45]:
# Register DataFrame as SQL table
tips_df.createOrReplaceTempView("tips")

# Run SQL query
sql_result = spark.sql("""
SELECT day, ROUND(AVG(tip), 2) AS avg_tip, COUNT(*) AS count
FROM tips
GROUP BY day
ORDER BY avg_tip DESC
""")
sql_result.show()


+----+-------+-----+
| day|avg_tip|count|
+----+-------+-----+
| Sun|   3.26|   76|
| Sat|   2.99|   87|
|Thur|   2.77|   62|
| Fri|   2.73|   19|
+----+-------+-----+



## RDDs (Resilient Distributed Datasets)

Before DataFrames, Spark used **RDDs** — low-level distributed collections.

Though DataFrames are preferred now, it’s useful to know basic RDD operations.


In [46]:
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])

# Transformations and Actions
rdd_squared = rdd.map(lambda x: x * x)
print("RDD elements:", rdd_squared.collect())


RDD elements: [1, 4, 9, 16, 25]


## Saving Data

You can write Spark DataFrames to local storage or cloud (CSV, Parquet, JSON, etc.)


In [None]:
tips_df.write.mode("overwrite").parquet("tips_parquet")
parquet_df = spark.read.parquet("tips_parquet")

# Summary

In this notebook, you learned:
- How to create and manipulate Spark DataFrames  
- Basic transformations and actions  
- SQL queries on Spark  
- Simple joins, aggregations, and file operations  

Next Steps:
👉 Move to advanced PySpark: window functions, UDFs, optimization, and pipeline building.
