In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, round
from pyspark.ml.feature import VectorAssembler, StringIndexer, MinMaxScaler
from pyspark.ml.regression import LinearRegression

spark_session = SparkSession \
    .builder \
    .master("spark://192.168.2.156:7077") \
    .appName("ProjectGR25Predict") \
    .config("spark.eventLog.enabled", "false") \
    .config("spark.dynamicAllocation.enabled", True) \
    .config("spark.dynamicAllocation.minExecutors", 1) \
    .config("spark.dynamicAllocation.maxExecutors", 1) \
    .config("spark.dynamicAllocation.initialExecutors", 1) \
    .config("spark.dynamicAllocation.shuffleTracking.enabled", True) \
    .config("spark.shuffle.service.enabled", False) \
    .config("spark.dynamicAllocation.executorIdleTimeout", "30s") \
    .config("spark.executor.cores", 6) \
    .config("spark.driver.cores", 2) \
    .config("spark.driver.port", 9999) \
    .config("spark.blockManager.port", 10005) \
    .getOrCreate()

spark_context = spark_session.sparkContext
spark_context.setLogLevel("ERROR")


In [2]:
# Read the dataset from hdfs
df = spark_session.read.csv("hdfs://192.168.2.92:9000/spark-data/MillionSongSubset.csv", header=True, inferSchema=True)

                                                                                

In [3]:
# Select relevant features and separate the entries with year
df_0 = df.select("tempo", "loudness", "duration", "key", "mode", "year").filter(col("year") == 0)
df = df.select("tempo", "loudness", "duration", "key", "mode", "year").filter(col("year") > 0)  

# Assemble for defining feature vector
features = ["tempo", "loudness", "duration", "key", "mode"]
assembler_model = VectorAssembler(inputCols=features, outputCol="features")

# Select target (year) and the rest as features
df_trans = assembler_model.transform(df).select("features", "year")

In [9]:
# Use minimax sclaer to normalize data
normalizing_scaler = MinMaxScaler(inputCol="features", outputCol="normalized_data")
normalizing_model = normalizing_scaler.fit(df_trans)
df_normal = normalizing_model.transform(df_trans).select("normalized_data", "year")

# Split data into train and test
train_data, test_data = df_normal.randomSplit([0.8, 0.2], seed=42)

# Define model and train
model = LinearRegression(featuresCol="normalized_data", labelCol="year")
trained_model = model.fit(train_data)

# Predict and round prediction to have integer number
predictions = trained_model.transform(test_data)
df_rounded_predictions = predictions.withColumn("predicted_year", round(predictions["prediction"]))

# Print some prediction
print("Model Predictions")
df_rounded_predictions.select("normalized_data", "year", "predicted_year").show(20)

# Evaluate model 
model_evaluation = trained_model.evaluate(test_data)
print(f"Average year penalty: {model_evaluation.rootMeanSquaredError:.2f}")

Model Predictions
+--------------------+----+--------------+
|     normalized_data|year|predicted_year|
+--------------------+----+--------------+
|[0.0,0.4316688693...|1981|        1985.0|
|[0.0,0.8118140550...|1991|        1999.0|
|[0.0,0.8488191691...|2003|        2001.0|
|[0.12760701569911...|1981|        1977.0|
|[0.14861390846499...|1936|        1998.0|
|[0.15611747468851...|1999|        1998.0|
|[0.16439420590156...|2005|        1997.0|
|[0.17592596172060...|2008|        1999.0|
|[0.20075615535977...|2006|        1991.0|
|[0.20377149881899...|2006|        1998.0|
|[0.20462584613243...|1995|        1995.0|
|[0.21352110933712...|1953|        1996.0|
|[0.21501331776694...|2003|        1995.0|
|[0.21991131797569...|2007|        1997.0|
|[0.22571392122221...|1993|        1996.0|
|[0.23230515275807...|2006|        1998.0|
|[0.23626762332948...|2005|        1993.0|
|[0.25054798068633...|2009|        1998.0|
|[0.25845359270441...|1971|        1992.0|
|[0.26078081932293...|2004|        1

In [15]:
# Predict year for missing entries
df_0_transformed = assembler_model.transform(df_0).select("features")

# Normalize
df_0_normalized = normalizing_model.transform(df_0_transformed).select("normalized_data")

# Predict and round prediction to have integer number
predictions_0 = trained_model.transform(df_0_normalized)
df_prediction_0 = predictions_0.withColumn("predicted_year", round(predictions_0["prediction"]))

# Print some predictions
df_prediction_0.select("normalized_data", "predicted_year").show(20)

+--------------------+--------------+
|     normalized_data|predicted_year|
+--------------------+--------------+
|[0.46425851544590...|        1997.0|
|[0.47192831214216...|        1995.0|
|[0.46420825972158...|        2000.0|
|[0.41305179818847...|        1995.0|
|[0.82110508471955...|        1998.0|
|[0.46384873800144...|        1999.0|
|[0.52798277388403...|        1994.0|
|[0.38908754933759...|        1992.0|
|[0.37019526281810...|        1997.0|
|[0.39758463257266...|        1986.0|
|[0.47644359568110...|        1999.0|
|[0.55479613572138...|        2000.0|
|[0.47890226034784...|        2000.0|
|[0.59034626194056...|        1992.0|
|[0.46922996632866...|        1994.0|
|[0.61242012239201...|        1997.0|
|[0.60411246457937...|        1990.0|
|[0.78357565612713...|        1990.0|
|[0.42117389640362...|        1982.0|
|[0.38680671261843...|        2000.0|
+--------------------+--------------+
only showing top 20 rows



In [16]:
spark_context.stop()