In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Transformer
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.functions import col, sin, cos, radians
from pyspark.sql import DataFrame
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import pandas as pd
import os


TEAM = 19
WAREHOUSE = "project/warehouse"

In [2]:
spark = SparkSession.builder\
        .appName(f"{TEAM} - spark ML")\
        .master("yarn")\
        .config("hive.metastore.uris",
                "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", WAREHOUSE)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .enableHiveSupport()\
        .getOrCreate()

In [3]:
for table in spark.catalog.listTables("team19_projectdb"):
    print(table.name)

air_quality
astronomical_data_part
locations
q1_results
q2_results
q4_results
q5_results
weather_conditions


In [4]:
air_quality = spark.read.format("avro").table('team19_projectdb.air_quality')
astronomical_data = spark.read.format("avro").table('team19_projectdb.astronomical_data_part')
locations = spark.read.format("avro").table('team19_projectdb.locations')
weather_conditions = spark.read.format("avro").table('team19_projectdb.weather_conditions')

In [5]:
table_for_ML = air_quality.join(locations, on=["id"], how="inner").join(weather_conditions, on=["id"], how="inner")

In [6]:
class LatLongToECEF(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self):
        super(LatLongToECEF, self).__init__()

    def _transform(self, df: DataFrame) -> DataFrame:
        # WGS84 constants
        a = 6378137.0  # semi-major axis
        e2 = 6.6943799901377997e-3

        # Convert degrees to radians
        lat_rad = radians(col("latitude"))
        lon_rad = radians(col("longitude"))

        # Calculate ECEF coordinates
        N = a / (1 - e2 * sin(lat_rad) ** 2) ** 0.5

        X = N * cos(lat_rad) * cos(lon_rad)
        Y = N * cos(lat_rad) * sin(lon_rad)
        Z = (1-e2) * N * sin(lat_rad)

        return df.withColumn("x", X).withColumn("y", Y).withColumn("z", Z)


ecof_transformer = LatLongToECEF()

In [25]:
ecof_transformer.transform(table_for_ML).head(1)

[Row(id=1, air_quality_carbon_monoxide=647.5, air_quality_ozone=130.2, air_quality_nitrogen_dioxide=1.2, air_quality_sulphur_dioxide=0.4, air_quality_pm2_5=7.9, air_quality_pm10=11.1, air_quality_us_epa_index=1, air_quality_gb_defra_index=1, country='Afghanistan', location_name='Kabul', latitude=34.52, longitude=69.18, timezone='Asia/Kabul', last_updated_epoch=1693301400, last_updated=1693306800000, temperature_celsius=28.8, temperature_fahrenheit=83.8, condition_text='Sunny', wind_mph=7.2, wind_kph=11.5, wind_degree=74, wind_direction='ENE', pressure_mb=1004.0, pressure_in=29.64, precip_mm=0.0, precip_in=0.0, humidity=19, cloud=0, feels_like_celsius=26.7, feels_like_fahrenheit=80.1, visibility_km=10.0, visibility_miles=6.0, uv_index=7.0, gust_mph=8.3, gust_kph=13.3, x=1869858.4270867545, y=4917265.761836958, z=3594120.1546326494)]

In [8]:
# target is string, so cast to int
indexer = StringIndexer(inputCol="condition_text", outputCol="label")

In [38]:
indexer.fit(table_for_ML).transform(table_for_ML).head(1)

[Row(id=1, air_quality_carbon_monoxide=647.5, air_quality_ozone=130.2, air_quality_nitrogen_dioxide=1.2, air_quality_sulphur_dioxide=0.4, air_quality_pm2_5=7.9, air_quality_pm10=11.1, air_quality_us_epa_index=1, air_quality_gb_defra_index=1, country='Afghanistan', location_name='Kabul', latitude=34.52, longitude=69.18, timezone='Asia/Kabul', last_updated_epoch=1693301400, last_updated=1693306800000, temperature_celsius=28.8, temperature_fahrenheit=83.8, condition_text='Sunny', wind_mph=7.2, wind_kph=11.5, wind_degree=74, wind_direction='ENE', pressure_mb=1004.0, pressure_in=29.64, precip_mm=0.0, precip_in=0.0, humidity=19, cloud=0, feels_like_celsius=26.7, feels_like_fahrenheit=80.1, visibility_km=10.0, visibility_miles=6.0, uv_index=7.0, gust_mph=8.3, gust_kph=13.3, target=2.0)]

In [9]:
features = ["air_quality_carbon_monoxide", "air_quality_ozone", "air_quality_nitrogen_dioxide", "air_quality_pm2_5", "air_quality_pm10" , "air_quality_us_epa_index", "air_quality_gb_defra_index",\
            "temperature_celsius", "wind_kph", "wind_degree", "pressure_mb", "precip_mm", "humidity", "cloud", "feels_like_celsius", "visibility_km", "gust_mph", "x", "y", "z"]

label = "label"

In [10]:
assembler = VectorAssembler(inputCols=features, outputCol= "features")

In [11]:
pipeline = Pipeline(stages=[indexer, ecof_transformer, assembler])

In [12]:
model = pipeline.fit(table_for_ML)
data = model.transform(table_for_ML)

In [13]:
data = data.select(["features", "label"])
data.head(1)

[Row(features=DenseVector([647.5, 130.2, 1.2, 7.9, 11.1, 1.0, 1.0, 28.8, 11.5, 74.0, 1004.0, 0.0, 19.0, 0.0, 26.7, 10.0, 8.3, 1869858.4271, 4917265.7618, 3594120.1546]), label=2.0)]

In [14]:
def run(command):
    return os.popen(command).read()

In [15]:
(train_data, test_data) = data.randomSplit([0.75, 0.25], seed = 10)

In [16]:
train_data.write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/train")

test_data.write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/test")

run("hdfs dfs -cat project/data/train/*.json > ~/project/big-data-pipeline-project/data/train.json")
run("hdfs dfs -cat project/data/test/*.json > ~/project/big-data-pipeline-project/data/test.json")

''

In [35]:
# FIRST MODEL TUNING
classifier_1 = DecisionTreeClassifier()

grid = ParamGridBuilder()
grid = grid.addGrid(classifier_1.maxDepth, [5, 7, 10]).addGrid(classifier_1.maxBins, [16, 32, 64]).build()

evaluator = MulticlassClassificationEvaluator()\
  .setLabelCol("label")\
  .setPredictionCol("prediction")\
  .setMetricName("f1")

cv = CrossValidator(estimator=classifier_1,
                    estimatorParamMaps=grid,
                    evaluator=evaluator,
                    parallelism=5,
                    numFolds=3)

cvModel = cv.fit(train_data)
bestModel_1 = cvModel.bestModel

predictions = bestModel_1.transform(test_data)

f1_1 = evaluator.evaluate(predictions)

f1_1

0.7825053623915481

In [28]:
bestModel_1.write().overwrite().save("project/models/model1")

In [26]:
predictions.select(predictions['label'], predictions["prediction"]).coalesce(1).write.csv('project/output/model1_predictions', header=True, mode = "overwrite")
run("hdfs dfs -mv project/output/model1_predictions/*.csv project/output/model1_predictions/model1_predictions.csv")
run("hdfs dfs -cat project/output/model1_predictions/model1_predictions.csv > ~/project/big-data-pipeline-project/output/model1_predictions.csv")

''

In [36]:
# SECOND MODEL TUNING
classifier_2 = RandomForestClassifier()

grid = ParamGridBuilder()
grid = grid.addGrid(classifier_2.maxDepth, [3,5,7]).addGrid(classifier_2.maxBins, [8,16,32]).addGrid(classifier_2.minInstancesPerNode, [1,2,4]).build()

evaluator = MulticlassClassificationEvaluator()\
  .setLabelCol("label")\
  .setPredictionCol("prediction")\
  .setMetricName("f1")

cv = CrossValidator(estimator=classifier_2,
                    estimatorParamMaps=grid,
                    evaluator=evaluator,
                    parallelism=5,
                    numFolds=3)

cvModel = cv.fit(train_data)
bestModel_2 = cvModel.bestModel

predictions = bestModel_2.transform(test_data)

f1_2 = evaluator.evaluate(predictions)
f1_2

0.7013255234970799

In [30]:
bestModel_2.write().overwrite().save("project/models/model2")

In [31]:
predictions.select(predictions['label'], predictions["prediction"]).coalesce(1).write.csv('project/output/model2_predictions', header=True, mode = "overwrite")
run("hdfs dfs -mv project/output/model2_predictions/*.csv project/output/model2_predictions/model2_predictions.csv")
run("hdfs dfs -cat project/output/model2_predictions/model2_predictions.csv > ~/project/big-data-pipeline-project/output/model2_predictions.csv")

''

In [32]:
# MOVING MODELS

In [52]:
run("hdfs dfs -get project/models/model1 ~/project/big-data-pipeline-project/models/")

''

In [53]:
run("hdfs dfs -get project/models/model2 ~/project/big-data-pipeline-project/models/")

''

In [37]:
comparison_data = {
    'model': [bestModel_1, bestModel_2],
    'f1': [f1_1, f1_2]
}

# Create a Pandas DataFrame from the dictionary
comparison_df = pd.DataFrame(comparison_data)
comparison_df.head()

Unnamed: 0,model,f1
0,DecisionTreeClassificationModel: uid=DecisionT...,0.782505
1,RandomForestClassificationModel: uid=RandomFor...,0.701326


In [40]:
comparison_path_local = "~/project/big-data-pipeline-project/output/evaluation.csv"
comparison_path_hdfs = "project/output/evaluation.csv"

In [43]:
comparison_df.to_csv(comparison_path_local, index=False)