In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=6c412dbb84cdbbc8cccde6977c446c57a564563b469ad81ffd73c8481d644929
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_set, format_number, max
import pandas as pd
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

spark = SparkSession.builder\
  .master("local")\
  .appName("Pyspark_SQL")\
  .config("spark.ui.port", '4050')\
  .getOrCreate()
df = spark.read.option("Header", True).csv("/content/Air_Traffic_Passenger_Statistics.csv")

In [4]:
from pyspark.sql.functions import mean, stddev

columns_to_cast = ['Activity Period', 'Passenger Count', 'Adjusted Passenger Count', 'year']

for column in columns_to_cast:
    df = df.withColumn(column, col(column).cast('double'))

numeric_columns = [column for column, dtype in df.dtypes if dtype in ["int", "double", "float"]]

for column in numeric_columns:
    df.select(mean(col(column)).alias('mean_' + column),
              stddev(col(column)).alias('stddev_' + column)).show()

+--------------------+----------------------+
|mean_Activity Period|stddev_Activity Period|
+--------------------+----------------------+
|  201045.07336576266|    313.33619609986414|
+--------------------+----------------------+

+--------------------+----------------------+
|mean_Passenger Count|stddev_Passenger Count|
+--------------------+----------------------+
|  29240.521090157927|    58319.509284123524|
+--------------------+----------------------+

+-----------------------------+-------------------------------+
|mean_Adjusted Passenger Count|stddev_Adjusted Passenger Count|
+-----------------------------+-------------------------------+
|           29331.917105350836|               58284.1822186625|
+-----------------------------+-------------------------------+

+-----------------+-----------------+
|        mean_year|      stddev_year|
+-----------------+-----------------+
|2010.385220230559|3.137589043169972|
+-----------------+-----------------+



In [5]:
numeric_columns = [column for column, dtype in df.dtypes if dtype in ['double', 'float', 'int']]
correlation_matrix = []

for x in numeric_columns:
    row = []
    for y in numeric_columns:
        row.append(df.stat.corr(x, y))
    correlation_matrix.append(row)

import pandas as pd

correlation_df = pd.DataFrame(correlation_matrix, columns=numeric_columns, index=numeric_columns)
print(correlation_df)


                          Activity Period  Passenger Count  \
Activity Period                  1.000000         0.060311   
Passenger Count                  0.060311         1.000000   
Adjusted Passenger Count         0.059336         0.999941   
year                             0.999940         0.060069   

                          Adjusted Passenger Count      year  
Activity Period                           0.059336  0.999940  
Passenger Count                           0.999941  0.060069  
Adjusted Passenger Count                  1.000000  0.059096  
year                                      0.059096  1.000000  


In [8]:

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col

columns_to_cast = ['Activity Period', 'Passenger Count', 'Adjusted Passenger Count', 'year']
all_columns = df.columns
for column in columns_to_cast:
    df = df.withColumn(column, col(column).cast('double'))

numeric_columns = ['Activity Period', 'Adjusted Passenger Count', 'year']
categorical_columns = [col for col in all_columns if col not in columns_to_cast]

indexers = [
    StringIndexer(inputCol=c, outputCol=c+"_index", handleInvalid="keep")
    for c in categorical_columns
]

encoders = [
    OneHotEncoder(inputCol=c+"_index", outputCol=c+"_vec")
    for c in categorical_columns
]

assemblerInputs = [c + "_vec" for c in categorical_columns] + numeric_columns

vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

lr = LinearRegression(featuresCol="features", labelCol="Passenger Count")

pipeline = Pipeline(stages=indexers + encoders + [vecAssembler, lr])

(train_data, test_data) = df.randomSplit([0.8, 0.2])
model = pipeline.fit(train_data)

predictions = model.transform(test_data)
evaluator_rmse = RegressionEvaluator(labelCol="Passenger Count", predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol="Passenger Count", predictionCol="prediction", metricName="r2")

rmse = evaluator_rmse.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)

print(f"RMSE: {rmse}")
print(f"R2: {r2}")

RMSE: 504.1521658099943
R2: 0.9999292132013743
