In [1]:
from pyspark.sql import SparkSession

team = 39

warehouse = "project/hive/warehouse"

spark = SparkSession.builder\
        .appName("{} - spark ML".format(team))\
        .master("yarn")\
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", warehouse)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .enableHiveSupport()\
        .getOrCreate()

In [31]:
metrics = spark.read.format("avro").table('team39_projectdb.metrics_part')
stations = spark.read.format("avro").table('team39_projectdb.stations')

In [32]:
df = metrics.join(stations, on='sid', how='inner')

In [33]:
import math
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCols
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.types import DoubleType

class Encoder(Transformer, HasInputCol, HasOutputCols, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self, inputCol, outputCols, n):
        super(Encoder, self).__init__()
        self._set(inputCol=inputCol, outputCols=outputCols)
        self.n = n

    def _transform(self, dataset):
        input_col = self.getInputCol()
        output_cols = self.getOutputCols()
        
        dataset = dataset.withColumn(output_cols[0], F.sin(2 * math.pi * F.col(input_col) / self.n))
        
        return dataset
    
    
df = df.withColumn("year", F.year("date_time"))
df = df.withColumn("month", F.month("date_time"))
df = df.withColumn("day", F.dayofmonth("date_time"))
df = df.withColumn("hour", F.hour("date_time"))
df = Encoder("month", ["month"], 12).transform(df)
df = Encoder("day", ["day"], 31).transform(df)
df = Encoder("hour", ["hour"], 24).transform(df)


In [34]:
import math
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCols, HasOutputCols
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.types import StructType, StructField, DoubleType

class GeodeticToECEFTransformer(Transformer, HasInputCols, HasOutputCols, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self, inputCols=None, outputCols=None, defaultAltitude=0.0):
        super(GeodeticToECEFTransformer, self).__init__()
        self._set(inputCols=inputCols, outputCols=outputCols)
        self.defaultAltitude = defaultAltitude

    def _transform(self, dataset):
        input_cols = self.getInputCols()
        output_cols = self.getOutputCols()
        
        a = 6378137.0
        f = 1 / 298.257223563
        b = a * (1 - f)
        e2 = 1 - (b ** 2) / (a ** 2)
        
        def geodetic_to_ecef(lat, lon, alt):
            lat_rad = math.radians(lat)
            lon_rad = math.radians(lon)
            
            N = a / math.sqrt(1 - e2 * math.sin(lat_rad) ** 2)
            
            x = (N + alt) * math.cos(lat_rad) * math.cos(lon_rad)
            y = (N + alt) * math.cos(lat_rad) * math.sin(lon_rad)
            z = ((1 - e2) * N + alt) * math.sin(lat_rad)
            
            return (x, y, z)
        
        output_schema = StructType([
            StructField(output_cols[0], DoubleType(), True),
            StructField(output_cols[1], DoubleType(), True),
            StructField(output_cols[2], DoubleType(), True)
        ])
        
        geodetic_to_ecef_udf = F.udf(geodetic_to_ecef, output_schema)
        
        dataset = dataset.withColumn("ecef_coords", geodetic_to_ecef_udf(F.col(input_cols[0]), F.col(input_cols[1]), F.lit(self.defaultAltitude)))
        
        dataset = dataset.withColumn(output_cols[0], F.col("ecef_coords")[output_cols[0]])
        dataset = dataset.withColumn(output_cols[1], F.col("ecef_coords")[output_cols[1]])
        dataset = dataset.withColumn(output_cols[2], F.col("ecef_coords")[output_cols[2]])
        
        dataset = dataset.drop("ecef_coords")
        
        return dataset
    
    
df = df.withColumn("latitude", F.col("latitude").cast(DoubleType()))
df = df.withColumn("longitude", F.col("longitude").cast(DoubleType()))
df = GeodeticToECEFTransformer(
    inputCols=["latitude", "longitude"],
    outputCols=["x", "y", "z"],
    defaultAltitude=0.0
).transform(df)

In [35]:
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType

feature_cols = ["so2", "co", "o3", "o3_8hr", "pm10", "pm2_5", "no2", "nox", "no", "windspeed", "winddirec", "co_8hr", "pm2_5_avg", "pm10_avg", "so2_avg", "year", "x", "y", "z"]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)
pipeline = Pipeline(stages=[assembler, scaler])
model = pipeline.fit(df)

df = model.transform(df)

In [36]:
df = df.select("scaledFeatures", "aqi")

In [37]:
train_data, test_data = df.randomSplit([0.7, 0.4], seed=42)


In [38]:
train_data.head(1)

[Row(scaledFeatures=DenseVector([0.2025, -0.0146, -0.4297, -1.1124, 0.05, -0.4687, -0.81, -0.3059, 0.5992, -0.9092, -0.9018, 0.1632, -0.3538, -0.1872, 0.6064, -1.1079, 0.4474, 0.7067, -0.5317]), aqi=Decimal('38.0'))]

In [41]:
import os
def run(command):
    return os.popen(command).read()
train_data.select("scaledFeatures", "aqi")\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/train")

In [42]:
run("hdfs dfs -cat project/data/train/*.json > data/train.json")

''

In [43]:
test_data.select("scaledFeatures", "aqi")\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/test")

In [44]:
run("hdfs dfs -cat project/data/test/*.json > ../data/test.json")

''

In [14]:
import os
def run(command):
    return os.popen(command).read()

In [16]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="aqi")

parameters = {
    'regParam': [0.01, 0.1, 1.0],
    'elasticNetParam': [0.0, 0.5, 1.0]
}

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, parameters['regParam'])
             .addGrid(lr.elasticNetParam, parameters['elasticNetParam'])
             .build())

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol="aqi", predictionCol="prediction", metricName="rmse"),
                          numFolds=3)

cvModel = crossval.fit(train_data)
bestModel = cvModel.bestModel

In [17]:
bestRegParam = bestModel._java_obj.getRegParam()
bestElasticNetParam = bestModel._java_obj.getElasticNetParam()

print(f"Best regParam: {bestRegParam}")
print(f"Best elasticNetParam: {bestElasticNetParam}")
predictions = cvModel.transform(test_data)

evaluator = RegressionEvaluator(labelCol="aqi", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")

evaluator = RegressionEvaluator(labelCol="aqi", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print(f"R2 = {r2}")

Best regParam: 0.01
Best elasticNetParam: 0.0
Root Mean Squared Error (RMSE) on test data = 7.014512661312377
R2 = 0.9367780344451548


In [18]:
bestModel.write().overwrite().save("project/models/model1")

In [11]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import GBTRegressor

gbt = GBTRegressor(featuresCol="scaledFeatures", labelCol="aqi")

parameters = {
    'maxDepth': [5, 10],
    'maxIter': [5, 10]
}

paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, parameters['maxDepth'])
             .addGrid(gbt.maxIter, parameters['maxIter'])
             .build())

crossval = CrossValidator(estimator=gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol="aqi", predictionCol="prediction", metricName="rmse"),
                          numFolds=3)

cvModel = crossval.fit(train_data)
bestModel = cvModel.bestModel

In [12]:
bestMaxDepth = bestModel._java_obj.getMaxDepth()
bestMaxIter = bestModel._java_obj.getMaxIter()
bestStepSize = bestModel._java_obj.getStepSize()

print(f"Best maxDepth: {bestMaxDepth}")
print(f"Best maxIter: {bestMaxIter}")
print(f"Best stepSize: {bestStepSize}")

predictions = cvModel.transform(test_data)

evaluator = RegressionEvaluator(labelCol="aqi", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")

evaluator = RegressionEvaluator(labelCol="aqi", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print(f"R2 = {r2}")

Best maxDepth: 10
Best maxIter: 10
Best stepSize: 0.1
Root Mean Squared Error (RMSE) on test data = 2.83174226002011
R2 = 0.9900065968720342


In [15]:
bestModel.write().overwrite().save("project/models/model2")

In [28]:
from pyspark.ml.regression import GBTRegressionModel

# Load the model
loadedModel = GBTRegressionModel.load("project/models/model2")

# Assuming test_data is your DataFrame
p = loadedModel.transform(test_data)

evaluator = RegressionEvaluator(labelCol="aqi", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(p)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")

evaluator = RegressionEvaluator(labelCol="aqi", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(p)
print(f"R2 = {r2}")

Root Mean Squared Error (RMSE) on test data = 2.818808693814717
R2 = 0.9895885309978958


In [30]:
from pyspark.ml.regression import LinearRegressionModel
from pyspark.ml.evaluation import RegressionEvaluator

# Load the model
loadedModel = LinearRegressionModel.load("project/models/model1")

# Assuming test_data is your DataFrame
p = loadedModel.transform(test_data)

evaluator = RegressionEvaluator(labelCol="aqi", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(p)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")

evaluator = RegressionEvaluator(labelCol="aqi", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(p)
print(f"R2 = {r2}")

Root Mean Squared Error (RMSE) on test data = 7.081189343071962
R2 = 0.9370092311678303
