<a href="https://colab.research.google.com/github/omik582004/Big_Data_2025_Semnan/blob/main/PySpark_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Using PySpark in Google Colab**

This notebook sets up and runs PySpark in Google Colab.

## Using Apache Spark with PySpark in Google Colab
* To use Apache Spark with Python via the PySpark library in Google Colab, follow these steps:



In [None]:
# Step 1: Install Java and PySpark
!apt-get install openjdk-11-jdk -y
!pip install pyspark

In [None]:
# Step 2: Import PySpark and Create a Spark Session
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("ColabSpark").getOrCreate()

# Print Spark version
print("Apache Spark version:", spark.version)

Apache Spark version: 3.5.5


In [None]:
# Step 3: Create a Sample DataFrame
from pyspark.sql import Row

# Create sample data
data = [
    Row(id=1, name="Alice", age=25),
    Row(id=2, name="Bob", age=30),
    Row(id=3, name="Charlie", age=35),
    Row(id=4, name="Arash", age=25),
    Row(id=5, name="Mohaddaseh", age=23),
    Row(id=6, name="Hashem", age=24),
    Row(id=3, name="Reza", age=35),
    Row(id=3, name="Zahra", age=24)
]

# Convert to a Spark DataFrame
df = spark.createDataFrame(data)

# Show the data
df.show()

+---+----------+---+
| id|      name|age|
+---+----------+---+
|  1|     Alice| 25|
|  2|       Bob| 30|
|  3|   Charlie| 35|
|  4|     Arash| 25|
|  5|Mohaddaseh| 23|
|  6|    Hashem| 24|
|  3|      Reza| 35|
|  3|     Zahra| 24|
+---+----------+---+



In [None]:
# Get the number of rows
num_rows = df.count()

# Get the number of columns
num_cols = len(df.columns)

# Print the dimensions
print("Shape: ({}, {})".format(num_rows, num_cols))

Shape: (8, 3)


In [None]:
# Step 4: Perform Basic Data Operations

# Print schema
df.printSchema()


root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



In [None]:
# Select specific columns
df.select("name", "age").show()

+----------+---+
|      name|age|
+----------+---+
|     Alice| 25|
|       Bob| 30|
|   Charlie| 35|
|     Arash| 25|
|Mohaddaseh| 23|
|    Hashem| 24|
|      Reza| 35|
|     Zahra| 24|
+----------+---+



In [None]:
# Filter data where age > 30
df.filter(df.age > 30).show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  3|Charlie| 35|
|  3|   Reza| 35|
+---+-------+---+



In [None]:
# Group by and count
df.groupBy("age").count().show()

+---+-----+
|age|count|
+---+-----+
| 25|    2|
| 35|    2|
| 30|    1|
| 23|    1|
| 24|    2|
+---+-----+



In [None]:
# Step 5: Stop the Spark Session
spark.stop()

In [None]:
## Read

# Why Use PySpark in Colab?
* ✅ Handles Large Datasets: PySpark is optimized for big data, whereas Pandas struggles with large files.
* ✅ Parallel Processing: Uses multiple CPU cores, making operations faster.
* ✅ Distributed Computing: Can scale from a single machine to a cluster.

#📌 Real Example: Analyzing a Big Dataset with PySpark in Google Colab
* In this tutorial, we'll use PySpark in Google Colab to analyze a large dataset step by step.

# 📂 Dataset: NYC Taxi Trips
* We'll use the New York City Taxi Trips dataset (🚕), which contains millions of taxi rides with details like pickup/drop-off times, passenger count, and fares.

✅ Dataset Source: NYC Taxi Trip Data on Kaggle

✅ File Size: ~1GB (a large dataset for real PySpark processing)

 ## Step 1: Install PySpark and Dependencies

##  Step 2: Download and Extract Dataset

## Dataset Description
The competition dataset is based on the 2016 NYC Yellow Cab trip record data made available in Big Query on Google Cloud Platform. The data was originally published by the NYC Taxi and Limousine Commission (TLC). The data was sampled and cleaned for the purposes of this playground competition. Based on individual trip attributes, participants should predict the duration of each trip in the test set.

------------------------------------
این مجموعه داده بر اساس اطلاعات سفرهای تاکسی زرد (Yellow Cab) شهر نیویورک در سال ۲۰۱۶ ساخته شده که توسط کمیسیون تاکسیرانی و لیموزین (TLC) نیویورک منتشر شده و در پلتفرم BigQuery گوگل قرار گرفته است.
اطلاعات خام برای این رقابت ساده‌سازی (تمیزکاری و نمونه‌گیری) شده‌اند. هدف این رقابت پیش‌بینی مدت زمان سفر برای هر سفر در مجموعه تست است، با استفاده از ویژگی‌های هر سفر.

# File descriptions

* train.csv - the training set (contains 1458644 trip records)
* test.csv - the testing set (contains 625134 trip records)
* sample_submission.csv - a sample submission file in the correct format
Data fields
## variables
* id - a unique identifier for each trip
* vendor_id - a code indicating the provider associated with the trip record
* pickup_datetime - date and time when the meter was engaged
* dropoff_datetime - date and time when the meter was disengaged
* passenger_count - the number of passengers in the vehicle (driver entered value)
* pickup_longitude - the longitude where the meter was engaged
* pickup_latitude - the latitude where the meter was engaged
* dropoff_longitude - the longitude where the meter was disengaged
* dropoff_latitude - the latitude where the meter was disengaged
* store_and_fwd_flag - This flag indicates whether the trip record was held in vehicle memory before sending to the vendor because the vehicle did not have a connection to the server - Y=store and forward; N=not a store and forward trip
* trip_duration - duration of the trip in seconds
--------
## 	توضیح
* id	شناسه یکتا برای هر سفر
* vendor_id	کدی که نشان می‌دهد این سفر متعلق به کدام شرکت تاکسیرانی است
* pickup_datetime	تاریخ و زمان سوار شدن مسافر (زمان شروع سفر)
* dropoff_datetime	تاریخ و زمان پیاده شدن مسافر (پایان سفر)
* passenger_count	تعداد مسافران (ورودی توسط راننده)
* pickup_longitude	طول جغرافیایی محل سوار شدن
* pickup_latitude	عرض جغرافیایی محل سوار شدن
* dropoff_longitude	طول جغرافیایی محل پیاده شدن
* dropoff_latitude	عرض جغرافیایی محل پیاده شدن
* store_and_fwd_flag	مشخص می‌کند آیا اطلاعات سفر ابتدا در حافظه خودرو ذخیره شده و بعداً ارسال شده‌اند یا خیر:
Y = ذخیره و ارسال، N = ارسال مستقیم
* trip_duration	مدت زمان سفر بر حسب ثانیه (این مقدار، هدف مدل برای پیش‌بینی است)

🧭 Steps to Download kaggle.json
Go to Kaggle:
Open https://www.kaggle.com/ and sign in to your account.
Go to Your Account Settings:
Click on your profile picture (top-right corner) → then go to "Account".
Scroll Down to API Section:
Look for a section titled "API".
Click “Create New API Token”:
This will automatically download a file named kaggle.json to your computer.

In [None]:
# Install Kaggle API
!pip install kaggle

Step 1: Upload your kaggle.json

In a Jupyter Notebook or Colab environment, run this cell to upload the file manually:

A file picker will open — choose your kaggle.json file from your local machine.



In [4]:

from google.colab import files
files.upload()

{}

In [None]:
# Set up Kaggle API (Upload your kaggle.json if needed)
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json



In [None]:
!kaggle competitions download -c new-york-city-taxi-fare-prediction

403 Client Error: Forbidden for url: https://www.kaggle.com/api/v1/competitions/data/download-all/new-york-city-taxi-fare-prediction


In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Step 3: Start PySpark Session

In [7]:
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("NYC_Taxi_Analysis").getOrCreate()

# Print Spark version
print("Apache Spark version:", spark.version)

Apache Spark version: 3.5.5
Apache Spark version: 3.5.5


## Step 4: Load the Large Dataset

In [9]:
# Load CSV into PySpark DataFrame
df = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/train.csv", header=True, inferSchema=True)

# Show first few rows
df.show(5)

+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|       id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|
+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|id2875421|        2|2016-03-14 17:24:55|2016-03-14 17:32:30|              1| -73.9821548461914| 40.76793670654297|-73.96463012695312|40.765602111816406|                 N|          455|
|id2377394|        1|2016-06-12 00:43:35|2016-06-12 00:54:38|              1|-73.98041534423828|40.738563537597656|-73.99948120117188| 40.73115158081055|                 N|          663|
|id3858529|        2|2016-01-19 11:35:24|2016-01-19 12:10:48|    

## Step 5: Inspect the Data

In [10]:
# Print DataFrame schema (column types)
df.printSchema()

# Get total number of rows
print("Total rows:", df.count())

root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: integer (nullable = true)

Total rows: 114370


## Step 6: Data Cleaning & Preprocessing
* Drop Unnecessary Columns

In [11]:
df = df.drop("id")  # Drop 'id' column as it's not needed
df.show(5)

+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|
+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|        2|2016-03-14 17:24:55|2016-03-14 17:32:30|              1| -73.9821548461914| 40.76793670654297|-73.96463012695312|40.765602111816406|                 N|          455|
|        1|2016-06-12 00:43:35|2016-06-12 00:54:38|              1|-73.98041534423828|40.738563537597656|-73.99948120117188| 40.73115158081055|                 N|          663|
|        2|2016-01-19 11:35:24|2016-01-19 12:10:48|              1| -73.9790267944336|40.763938903808594|-74.005332

## Convert Dates to Proper Format

In [12]:
from pyspark.sql.functions import col, to_timestamp

df = df.withColumn("pickup_datetime", to_timestamp(col("pickup_datetime")))
df = df.withColumn("dropoff_datetime", to_timestamp(col("dropoff_datetime")))

df.show(5)

+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|
+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|        2|2016-03-14 17:24:55|2016-03-14 17:32:30|              1| -73.9821548461914| 40.76793670654297|-73.96463012695312|40.765602111816406|                 N|          455|
|        1|2016-06-12 00:43:35|2016-06-12 00:54:38|              1|-73.98041534423828|40.738563537597656|-73.99948120117188| 40.73115158081055|                 N|          663|
|        2|2016-01-19 11:35:24|2016-01-19 12:10:48|              1| -73.9790267944336|40.763938903808594|-74.005332

## Step 7: Exploratory Data Analysis (EDA)
* Get Basic Statistics

In [13]:
df.describe().show()

+-------+-------------------+------------------+-------------------+-------------------+--------------------+-------------------+------------------+------------------+
|summary|          vendor_id|   passenger_count|   pickup_longitude|    pickup_latitude|   dropoff_longitude|   dropoff_latitude|store_and_fwd_flag|     trip_duration|
+-------+-------------------+------------------+-------------------+-------------------+--------------------+-------------------+------------------+------------------+
|  count|             114370|            114370|             114370|             114369|              114369|             114369|            114369|            114369|
|   mean|  1.533199265541663|1.6655941243333041| -73.97350351617628|  40.75104375519249|   -73.9734628172034|  40.75194841477329|              NULL| 936.3912861002544|
| stddev|0.49889877233499447|1.3146960285567282|0.03832935926787627|0.02810707582982428|0.036676833852487066|0.03247940958313746|              NULL|2967.5182289

* Find the Most Common Pickup Locations

In [14]:
from pyspark.sql.functions import count

df.groupBy("pickup_longitude", "pickup_latitude").count().orderBy(col("count").desc()).show(10)

+------------------+------------------+-----+
|  pickup_longitude|   pickup_latitude|count|
+------------------+------------------+-----+
|-73.95466613769531| 40.82100296020508|    4|
|-73.87307739257812| 40.77412033081055|    4|
| -73.8708267211914|40.773738861083984|    4|
| -73.9940414428711|40.751129150390625|    3|
|-73.86337280273438| 40.76995849609375|    3|
|-73.77677917480469| 40.64537811279297|    3|
| -73.9941177368164| 40.75115966796875|    3|
|-73.87452697753906| 40.77410888671875|    3|
|-73.77670288085938| 40.64543151855469|    3|
|-73.78205871582031|40.644718170166016|    3|
+------------------+------------------+-----+
only showing top 10 rows



* Find the Average Trip Duration

In [15]:
from pyspark.sql.functions import mean

df.select(mean("trip_duration")).show()

+------------------+
|avg(trip_duration)|
+------------------+
| 936.3912861002544|
+------------------+



## Step 8: Feature Engineering
* Create a New Column for Trip Duration in Minutes

In [16]:
from pyspark.sql.functions import (unix_timestamp, round)

df = df.withColumn("trip_duration_mins", round((unix_timestamp("dropoff_datetime") - unix_timestamp("pickup_datetime")) / 60, 2))
df.select("trip_duration", "trip_duration_mins").show(5)

+-------------+------------------+
|trip_duration|trip_duration_mins|
+-------------+------------------+
|          455|              7.58|
|          663|             11.05|
|         2124|              35.4|
|          429|              7.15|
|          435|              7.25|
+-------------+------------------+
only showing top 5 rows



## Step 9: Data Visualization
*  Visualize Trip Duration Distribution

In [18]:
import matplotlib.pyplot as plt

# Convert to Pandas for plotting
trip_durations = df.select("trip_duration_mins").sample(fraction=0.01).toPandas()

In [19]:
trip_durations

Unnamed: 0,trip_duration_mins
0,24.63
1,16.63
2,45.98
3,7.47
4,14.42
...,...
1023,19.22
1024,32.77
1025,40.75
1026,7.02


In [None]:
# Plot histogram
plt.figure(figsize=(8,5))
plt.hist(trip_durations["trip_duration_mins"], bins=50, color="blue", alpha=0.7)
plt.xlabel("Trip Duration (mins)")
plt.ylabel("Frequency")
plt.title("Distribution of Trip Duration")
plt.show()


In [None]:
# Visualize the Number of Passengers per Ride
passenger_counts = df.groupBy("passenger_count").count().orderBy("passenger_count").toPandas()

plt.bar(passenger_counts["passenger_count"], passenger_counts["count"], color="green")
plt.xlabel("Passenger Count")
plt.ylabel("Number of Trips")
plt.title("Number of Passengers Per Ride")
plt.show()

## Step 10: Machine Learning with Spark ML

In [None]:
#Prepare Data for Regression Model
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# Select features
feature_cols = ["passenger_count", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_ml = assembler.transform(df)


In [None]:
# Select final dataset for ML
df_ml = df_ml.select("features", "trip_duration_mins")
df_ml.show(5)

## Step 11: Train a Linear Regression Model

In [None]:
# Split data into training and testing sets
train_data, test_data = df_ml.randomSplit([0.8, 0.2], seed=42)


In [None]:
# Train model
lr = LinearRegression(featuresCol="features", labelCol="trip_duration_mins")
model = lr.fit(train_data)

In [None]:
# Print model coefficients
print("Coefficients:", model.coefficients)
print("Intercept:", model.intercept)

## Step 12: Model Evaluation

In [None]:
# Make predictions on test data
predictions = model.transform(test_data)
predictions.select("trip_duration_mins", "prediction").show(5)

In [None]:
# Evaluate model
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="trip_duration_mins", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

print("Root Mean Squared Error (RMSE):", rmse)


In [None]:
# Install Java and Spark
!apt-get install openjdk-11-jdk -y
!wget -qO - https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz | tar xvz -C /opt/

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/opt/spark-3.3.2-bin-hadoop3"

# Install PySpark
!pip install -q findspark pyspark


In [None]:

# Initialize PySpark
import findspark
findspark.init()

from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("ColabSparkExample").getOrCreate()

# Load Sample Data (Can be replaced with your dataset)
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
columns = ["Name", "Age"]

df = spark.createDataFrame(data, columns)

# Show Data
df.show()

# Simple Data Transformation
df_filtered = df.filter(df.Age > 25)
df_filtered.show()

# Stop Spark Session
spark.stop()
