In [1]:
# Import main packages
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
# Create Spark context
sparkConf = SparkConf()
sparkConf.setAppName("wiki")
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext

22/12/01 14:13:10 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
# Read dataframe
df_cat = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("delimiter", "\t") \
    .option("mode", "DROPMALFORMED") \
    .load("hdfs://localhost:9000/user/bigdata2022/datasets/wiki/categories.tsv")
df_cat.show(5)

+--------------------+--------------------+
|             article|            category|
+--------------------+--------------------+
|%C3%81ed%C3%A1n_m...|subject.History.B...|
|%C3%81ed%C3%A1n_m...|subject.People.Hi...|
|          %C3%85land|   subject.Countries|
|          %C3%85land|subject.Geography...|
|  %C3%89douard_Manet|subject.People.Ar...|
+--------------------+--------------------+
only showing top 5 rows



In [4]:
from urllib.parse import unquote
rdd_cat = df_cat.rdd
rdd_cat  = rdd_cat.map(lambda x: (unquote(x.article),unquote(x.category)))
rdd_cat.collect()

[Stage 2:>                                                          (0 + 1) / 1]                                                                                

[('Áedán_mac_Gabráin',
  'subject.History.British_History.British_History_1500_and_before_including_Roman_Britain'),
 ('Áedán_mac_Gabráin', 'subject.People.Historical_figures'),
 ('Åland', 'subject.Countries'),
 ('Åland', 'subject.Geography.European_Geography.European_Countries'),
 ('Édouard_Manet', 'subject.People.Artists'),
 ('Éire', 'subject.Countries'),
 ('Éire', 'subject.Geography.European_Geography.European_Countries'),
 ('Óengus_I_of_the_Picts',
  'subject.History.British_History.British_History_1500_and_before_including_Roman_Britain'),
 ('Óengus_I_of_the_Picts', 'subject.People.Historical_figures'),
 ('€2_commemorative_coins', 'subject.Business_Studies.Currency'),
 ('10th_century', 'subject.History.General_history'),
 ('11th_century', 'subject.History.General_history'),
 ('12th_century', 'subject.History.General_history'),
 ('13th_century', 'subject.History.General_history'),
 ('14th_century', 'subject.History.General_history'),
 ('15th_Marine_Expeditionary_Unit',
  'subject.H

In [5]:
#output_cat = rdd_cat.map(lambda x: (x[0],x[1])) \
#            .reduceByKey(lambda a, b: a+";"+b)
#output_cat.collect()

In [6]:
# Read dataframe
df_link = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("delimiter", "\t") \
    .option("mode", "DROPMALFORMED") \
    .load("hdfs://localhost:9000/user/bigdata2022/datasets/wiki/links.tsv")
df_link.show(5)

+--------------------+--------------+
|              source|   destination|
+--------------------+--------------+
|%C3%81ed%C3%A1n_m...|          Bede|
|%C3%81ed%C3%A1n_m...|       Columba|
|%C3%81ed%C3%A1n_m...|D%C3%A1l_Riata|
|%C3%81ed%C3%A1n_m...| Great_Britain|
|%C3%81ed%C3%A1n_m...|       Ireland|
+--------------------+--------------+
only showing top 5 rows



In [7]:
from urllib.parse import unquote
rdd_link = df_link.rdd
rdd_link  = rdd_link.map(lambda x: (unquote(x.source),unquote(x.destination)))
rdd_link.collect()

                                                                                

[('Áedán_mac_Gabráin', 'Bede'),
 ('Áedán_mac_Gabráin', 'Columba'),
 ('Áedán_mac_Gabráin', 'Dál_Riata'),
 ('Áedán_mac_Gabráin', 'Great_Britain'),
 ('Áedán_mac_Gabráin', 'Ireland'),
 ('Áedán_mac_Gabráin', 'Isle_of_Man'),
 ('Áedán_mac_Gabráin', 'Monarchy'),
 ('Áedán_mac_Gabráin', 'Orkney'),
 ('Áedán_mac_Gabráin', 'Picts'),
 ('Áedán_mac_Gabráin', 'Scotland'),
 ('Áedán_mac_Gabráin', 'Wales'),
 ('Åland', '20th_century'),
 ('Åland', 'Baltic_Sea'),
 ('Åland', 'Crimean_War'),
 ('Åland', 'Currency'),
 ('Åland', 'Euro'),
 ('Åland', 'European_Union'),
 ('Åland', 'Finland'),
 ('Åland', 'League_of_Nations'),
 ('Åland', 'List_of_countries_by_system_of_government'),
 ('Åland', 'Nationality'),
 ('Åland', 'Parliamentary_system'),
 ('Åland', 'Police'),
 ('Åland', 'Russia'),
 ('Åland', 'Stockholm'),
 ('Åland', 'Sweden'),
 ('Åland', 'Time_zone'),
 ('Åland', 'Tourism'),
 ('Åland', 'United_Kingdom'),
 ('Åland', 'World_War_II'),
 ('Édouard_Manet', 'Absinthe'),
 ('Édouard_Manet', 'Beer'),
 ('Édouard_Manet', 'C

In [8]:
#output_link = rdd_link.map(lambda x: (x[0],x[1])) \
#            .reduceByKey(lambda a, b: a+";"+b)
#output_link.collect()

In [9]:
# Read dataframe
df_path = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("delimiter", "\t") \
    .option("mode", "DROPMALFORMED") \
    .load("hdfs://localhost:9000/user/bigdata2022/datasets/wiki/paths_finished.tsv")
df_path.show(5)

+----------------+----------+-------------+--------------------+------+
| hashedIpAddress| timestamp|durationInSec|                path|rating|
+----------------+----------+-------------+--------------------+------+
|6a3701d319fc3754|1297740409|          166|14th_century;15th...|  NULL|
|3824310e536af032|1344753412|           88|14th_century;Euro...|     3|
|415612e93584d30e|1349298640|          138|14th_century;Nige...|  NULL|
|64dd5cd342e3780c|1265613925|           37|14th_century;Rena...|  NULL|
|015245d773376aab|1366730828|          175|14th_century;Ital...|     3|
+----------------+----------+-------------+--------------------+------+
only showing top 5 rows



In [10]:
def formatRow(x):
    values = unquote(x.path).split(";")
    length = len(values)
    if(x.rating == "NULL"):
        rating = 0
        ratingNum = 0
    else:
        rating = int(x.rating)
        ratingNum = 1
    return (values[0]+"::"+values[-1],int(x.durationInSec),rating,ratingNum,length)

rdd_path = df_path.rdd
rdd_path  = rdd_path.map(lambda x: formatRow(x))
rdd_path.collect()

                                                                                

[('14th_century::African_slave_trade', 166, 0, 0, 9),
 ('14th_century::African_slave_trade', 88, 3, 1, 5),
 ('14th_century::African_slave_trade', 138, 0, 0, 8),
 ('14th_century::Greece', 37, 0, 0, 4),
 ('14th_century::John_F._Kennedy', 175, 3, 1, 7),
 ('14th_century::John_F._Kennedy', 110, 0, 0, 6),
 ('14th_century::Fire', 112, 2, 1, 4),
 ('14th_century::Rainbow', 139, 1, 1, 6),
 ('14th_century::Rainbow', 74, 3, 1, 4),
 ('14th_century::Rainbow', 167, 0, 0, 7),
 ('14th_century::Rainbow', 253, 3, 1, 13),
 ('14th_century::Rainbow', 218, 3, 1, 10),
 ('14th_century::Rainbow', 66, 0, 0, 5),
 ('14th_century::Rainbow', 391, 5, 1, 16),
 ('14th_century::Rainbow', 432, 0, 0, 9),
 ('14th_century::Rainbow', 43, 0, 0, 5),
 ('14th_century::Rainbow', 387, 2, 1, 15),
 ('14th_century::Rainbow', 179, 3, 1, 10),
 ('14th_century::Rainbow', 169, 0, 0, 8),
 ('14th_century::Rainbow', 246, 0, 0, 14),
 ('14th_century::Rainbow', 345, 3, 1, 8),
 ('14th_century::Rainbow', 265, 4, 1, 7),
 ('14th_century::Rainbow', 

In [11]:
rdd_rating  = rdd_path.map(lambda x: (x[0],x[2])) \
                .reduceByKey(lambda a, b: a+b)
rdd_rating.collect()

                                                                                

[('14th_century::African_slave_trade', 3),
 ('14th_century::Rainbow', 34),
 ('14th_century::Sodium', 3),
 ('14th_century::Henry_David_Thoreau', 5),
 ('2004_Atlantic_hurricane_season::Ice_age', 3),
 ('2005_Atlantic_hurricane_season::Spain', 1),
 ('2005_Atlantic_hurricane_season::Jazz', 13),
 ('2005_Atlantic_hurricane_season::Liechtenstein', 7),
 ('2005_Atlantic_hurricane_season::Sweden', 0),
 ('2005_Atlantic_hurricane_season::Burundi', 2),
 ('2005_Atlantic_hurricane_season::Hydrogen_peroxide', 0),
 ('2005_Atlantic_hurricane_season::International_Court_of_Justice', 0),
 ('2005_Atlantic_hurricane_season::William_McKinley', 2),
 ('Aberdeen::Vampire_bat', 8),
 ('Aberdeen::Bread', 0),
 ('Aberdeen::Sparrowhawk', 0),
 ('Acceleration::Dresden', 2),
 ('Acceleration::Elephant', 1),
 ('Acceleration::Trout', 15),
 ('Acceleration::Potassium_iodide', 7),
 ('Achilles::Gold', 3),
 ('Achilles::Great_white_shark', 3),
 ('Achilles::Bongo_(antelope)', 2),
 ('Achilles_tendon::Great_white_shark', 2),
 ('Afri

In [12]:
rdd_ratingsNum  = rdd_path.map(lambda x: (x[0],x[3])) \
                    .reduceByKey(lambda a, b: a+b)
rdd_ratingsNum.collect()

                                                                                

[('14th_century::African_slave_trade', 1),
 ('14th_century::Rainbow', 13),
 ('14th_century::Sodium', 1),
 ('14th_century::Henry_David_Thoreau', 1),
 ('2004_Atlantic_hurricane_season::Ice_age', 2),
 ('2005_Atlantic_hurricane_season::Spain', 1),
 ('2005_Atlantic_hurricane_season::Jazz', 4),
 ('2005_Atlantic_hurricane_season::Liechtenstein', 2),
 ('2005_Atlantic_hurricane_season::Sweden', 0),
 ('2005_Atlantic_hurricane_season::Burundi', 1),
 ('2005_Atlantic_hurricane_season::Hydrogen_peroxide', 0),
 ('2005_Atlantic_hurricane_season::International_Court_of_Justice', 0),
 ('2005_Atlantic_hurricane_season::William_McKinley', 1),
 ('Aberdeen::Vampire_bat', 2),
 ('Aberdeen::Bread', 0),
 ('Aberdeen::Sparrowhawk', 0),
 ('Acceleration::Dresden', 1),
 ('Acceleration::Elephant', 1),
 ('Acceleration::Trout', 4),
 ('Acceleration::Potassium_iodide', 2),
 ('Achilles::Gold', 1),
 ('Achilles::Great_white_shark', 1),
 ('Achilles::Bongo_(antelope)', 1),
 ('Achilles_tendon::Great_white_shark', 1),
 ('Africa

In [13]:
def divide(x,y):
    return round(x/y) if y else 0 
    
rdd_rating2 = rdd_rating.union(rdd_ratingsNum).reduceByKey(lambda x,y : divide(x,y))
rdd_rating2.collect()

[('14th_century::African_slave_trade', 3),
 ('14th_century::Rainbow', 3),
 ('14th_century::Sodium', 3),
 ('14th_century::Henry_David_Thoreau', 5),
 ('2004_Atlantic_hurricane_season::Ice_age', 2),
 ('2005_Atlantic_hurricane_season::Spain', 1),
 ('2005_Atlantic_hurricane_season::Jazz', 3),
 ('2005_Atlantic_hurricane_season::Liechtenstein', 4),
 ('2005_Atlantic_hurricane_season::Sweden', 0),
 ('2005_Atlantic_hurricane_season::Burundi', 2),
 ('2005_Atlantic_hurricane_season::Hydrogen_peroxide', 0),
 ('2005_Atlantic_hurricane_season::International_Court_of_Justice', 0),
 ('2005_Atlantic_hurricane_season::William_McKinley', 2),
 ('Aberdeen::Vampire_bat', 4),
 ('Aberdeen::Bread', 0),
 ('Aberdeen::Sparrowhawk', 0),
 ('Acceleration::Dresden', 2),
 ('Acceleration::Elephant', 1),
 ('Acceleration::Trout', 4),
 ('Acceleration::Potassium_iodide', 4),
 ('Achilles::Gold', 3),
 ('Achilles::Great_white_shark', 3),
 ('Achilles::Bongo_(antelope)', 2),
 ('Achilles_tendon::Great_white_shark', 2),
 ('Africa:

In [14]:
import math

rdd_duration = rdd_path \
    .map(lambda x: (x[0], x[1])) \
    .mapValues(lambda v: (v, 1)) \
    .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) \
    .mapValues(lambda v: math.ceil(v[0]/v[1]))
rdd_duration.collect()

                                                                                

[('14th_century::African_slave_trade', 131),
 ('14th_century::Rainbow', 182),
 ('14th_century::Sodium', 150),
 ('14th_century::Henry_David_Thoreau', 507),
 ('2004_Atlantic_hurricane_season::Ice_age', 109),
 ('2005_Atlantic_hurricane_season::Spain', 22),
 ('2005_Atlantic_hurricane_season::Jazz', 312),
 ('2005_Atlantic_hurricane_season::Liechtenstein', 309),
 ('2005_Atlantic_hurricane_season::Sweden', 53),
 ('2005_Atlantic_hurricane_season::Burundi', 148),
 ('2005_Atlantic_hurricane_season::Hydrogen_peroxide', 223),
 ('2005_Atlantic_hurricane_season::International_Court_of_Justice', 132),
 ('2005_Atlantic_hurricane_season::William_McKinley', 104),
 ('Aberdeen::Vampire_bat', 331),
 ('Aberdeen::Bread', 77),
 ('Aberdeen::Sparrowhawk', 206),
 ('Acceleration::Dresden', 59),
 ('Acceleration::Elephant', 40),
 ('Acceleration::Trout', 155),
 ('Acceleration::Potassium_iodide', 122),
 ('Achilles::Gold', 137),
 ('Achilles::Great_white_shark', 213),
 ('Achilles::Bongo_(antelope)', 159),
 ('Achilles_t

In [15]:
rdd_length = rdd_path \
    .map(lambda x: (x[0], x[4])) \
    .mapValues(lambda v: (v, 1)) \
    .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) \
    .mapValues(lambda v: math.ceil(v[0]/v[1]))
rdd_length.collect()

                                                                                

[('14th_century::African_slave_trade', 8),
 ('14th_century::Rainbow', 8),
 ('14th_century::Sodium', 6),
 ('14th_century::Henry_David_Thoreau', 27),
 ('2004_Atlantic_hurricane_season::Ice_age', 7),
 ('2005_Atlantic_hurricane_season::Spain', 4),
 ('2005_Atlantic_hurricane_season::Jazz', 12),
 ('2005_Atlantic_hurricane_season::Liechtenstein', 5),
 ('2005_Atlantic_hurricane_season::Sweden', 5),
 ('2005_Atlantic_hurricane_season::Burundi', 4),
 ('2005_Atlantic_hurricane_season::Hydrogen_peroxide', 5),
 ('2005_Atlantic_hurricane_season::International_Court_of_Justice', 8),
 ('2005_Atlantic_hurricane_season::William_McKinley', 5),
 ('Aberdeen::Vampire_bat', 8),
 ('Aberdeen::Bread', 6),
 ('Aberdeen::Sparrowhawk', 8),
 ('Acceleration::Dresden', 4),
 ('Acceleration::Elephant', 4),
 ('Acceleration::Trout', 10),
 ('Acceleration::Potassium_iodide', 10),
 ('Achilles::Gold', 6),
 ('Achilles::Great_white_shark', 14),
 ('Achilles::Bongo_(antelope)', 6),
 ('Achilles_tendon::Great_white_shark', 10),
 ('A

In [24]:
rdd_eda1 = rdd_rating2.join(rdd_duration)
rdd_eda1.collect()

[Stage 165:>                                                        (0 + 2) / 2]                                                                                

[('14th_century::African_slave_trade', (3, 131)),
 ('14th_century::Rainbow', (3, 182)),
 ('14th_century::Sodium', (3, 150)),
 ('14th_century::Henry_David_Thoreau', (5, 507)),
 ('2004_Atlantic_hurricane_season::Ice_age', (2, 109)),
 ('2005_Atlantic_hurricane_season::Spain', (1, 22)),
 ('2005_Atlantic_hurricane_season::Jazz', (3, 312)),
 ('2005_Atlantic_hurricane_season::Liechtenstein', (4, 309)),
 ('2005_Atlantic_hurricane_season::Sweden', (0, 53)),
 ('2005_Atlantic_hurricane_season::Burundi', (2, 148)),
 ('2005_Atlantic_hurricane_season::Hydrogen_peroxide', (0, 223)),
 ('2005_Atlantic_hurricane_season::International_Court_of_Justice', (0, 132)),
 ('2005_Atlantic_hurricane_season::William_McKinley', (2, 104)),
 ('Aberdeen::Vampire_bat', (4, 331)),
 ('Aberdeen::Bread', (0, 77)),
 ('Aberdeen::Sparrowhawk', (0, 206)),
 ('Acceleration::Dresden', (2, 59)),
 ('Acceleration::Elephant', (1, 40)),
 ('Acceleration::Trout', (4, 155)),
 ('Acceleration::Potassium_iodide', (4, 122)),
 ('Achilles::Gol

In [27]:
rdd_eda2 = rdd_eda1.join(rdd_length).map(lambda x: (x[0],x[1][0][1],x[1][1],x[1][0][0]))
rdd_eda2.collect()

                                                                                

[('14th_century::African_slave_trade', 131, 8, 3),
 ('14th_century::Rainbow', 182, 8, 3),
 ('14th_century::Sodium', 150, 6, 3),
 ('14th_century::Henry_David_Thoreau', 507, 27, 5),
 ('2004_Atlantic_hurricane_season::Ice_age', 109, 7, 2),
 ('2005_Atlantic_hurricane_season::Spain', 22, 4, 1),
 ('2005_Atlantic_hurricane_season::Jazz', 312, 12, 3),
 ('2005_Atlantic_hurricane_season::Liechtenstein', 309, 5, 4),
 ('2005_Atlantic_hurricane_season::Sweden', 53, 5, 0),
 ('2005_Atlantic_hurricane_season::Burundi', 148, 4, 2),
 ('2005_Atlantic_hurricane_season::Hydrogen_peroxide', 223, 5, 0),
 ('2005_Atlantic_hurricane_season::International_Court_of_Justice', 132, 8, 0),
 ('2005_Atlantic_hurricane_season::William_McKinley', 104, 5, 2),
 ('Aberdeen::Vampire_bat', 331, 8, 4),
 ('Aberdeen::Bread', 77, 6, 0),
 ('Aberdeen::Sparrowhawk', 206, 8, 0),
 ('Acceleration::Dresden', 59, 4, 2),
 ('Acceleration::Elephant', 40, 4, 1),
 ('Acceleration::Trout', 155, 10, 4),
 ('Acceleration::Potassium_iodide', 122, 

In [28]:
def isUnrated(x): return x[-1] == 0
def isRated(x): return not isUnrated(x)

rdd_unrated, rdd_rated = (rdd_eda2.filter(f) for f in (isUnrated, isRated))

In [29]:
rdd_unrated.collect()

                                                                                

[('2005_Atlantic_hurricane_season::Sweden', 53, 5, 0),
 ('2005_Atlantic_hurricane_season::Hydrogen_peroxide', 223, 5, 0),
 ('2005_Atlantic_hurricane_season::International_Court_of_Justice', 132, 8, 0),
 ('Aberdeen::Bread', 77, 6, 0),
 ('Aberdeen::Sparrowhawk', 206, 8, 0),
 ('Africa::Australia', 20, 3, 0),
 ('Africa::Sauropodomorpha', 137, 5, 0),
 ('Africa::Poverty', 47, 2, 0),
 ('Age_of_Enlightenment::Art', 42, 4, 0),
 ('Agriculture::Tropical_Storm_Delta_(2005)', 341, 7, 0),
 ('AIDS::Steel', 53, 4, 0),
 ('AIDS::Paris', 40, 4, 0),
 ('AIDS::Napoleon_I_of_France', 68, 7, 0),
 ('AIDS::Ethics', 21, 3, 0),
 ('Aircraft::Bangladesh', 65, 3, 0),
 ('Aircraft::United_States_Constitution', 45, 6, 0),
 ('Algorithm::Life', 89, 4, 0),
 ('Algorithm::Argon', 50, 7, 0),
 ('Allegory_in_the_Middle_Ages::Anarchism', 50, 5, 0),
 ('Aluminium::Hinduism', 166, 4, 0),
 ('Aluminium_chloride::Neptunium', 60, 4, 0),
 ('American_Civil_War::Nouakchott', 80, 4, 0),
 ('Anaconda::Allosaurus', 102, 8, 0),
 ('Animal::Apo

In [30]:
rdd_rated.collect()

                                                                                

[('14th_century::African_slave_trade', 131, 8, 3),
 ('14th_century::Rainbow', 182, 8, 3),
 ('14th_century::Sodium', 150, 6, 3),
 ('14th_century::Henry_David_Thoreau', 507, 27, 5),
 ('2004_Atlantic_hurricane_season::Ice_age', 109, 7, 2),
 ('2005_Atlantic_hurricane_season::Spain', 22, 4, 1),
 ('2005_Atlantic_hurricane_season::Jazz', 312, 12, 3),
 ('2005_Atlantic_hurricane_season::Liechtenstein', 309, 5, 4),
 ('2005_Atlantic_hurricane_season::Burundi', 148, 4, 2),
 ('2005_Atlantic_hurricane_season::William_McKinley', 104, 5, 2),
 ('Aberdeen::Vampire_bat', 331, 8, 4),
 ('Acceleration::Dresden', 59, 4, 2),
 ('Acceleration::Elephant', 40, 4, 1),
 ('Acceleration::Trout', 155, 10, 4),
 ('Acceleration::Potassium_iodide', 122, 10, 4),
 ('Achilles::Gold', 137, 6, 3),
 ('Achilles::Great_white_shark', 213, 14, 3),
 ('Achilles::Bongo_(antelope)', 159, 6, 2),
 ('Achilles_tendon::Great_white_shark', 150, 10, 2),
 ('Africa::England', 76, 5, 1),
 ('Africa::Mexico', 36, 4, 1),
 ('Africa::Potato', 148, 5,

In [72]:
df = rdd_rated.toDF(["source::target","duration","pathLength","label"])
df = df.drop("source::target")
df.printSchema()
df.show(truncate=False)

                                                                                

root
 |-- duration: long (nullable = true)
 |-- pathLength: long (nullable = true)
 |-- label: long (nullable = true)

+--------+----------+-----+
|duration|pathLength|label|
+--------+----------+-----+
|131     |8         |3    |
|182     |8         |3    |
|150     |6         |3    |
|507     |27        |5    |
|109     |7         |2    |
|22      |4         |1    |
|312     |12        |3    |
|309     |5         |4    |
|148     |4         |2    |
|104     |5         |2    |
|331     |8         |4    |
|59      |4         |2    |
|40      |4         |1    |
|155     |10        |4    |
|122     |10        |4    |
|137     |6         |3    |
|213     |14        |3    |
|159     |6         |2    |
|150     |10        |2    |
|76      |5         |1    |
+--------+----------+-----+
only showing top 20 rows



[Stage 1104:>                                                       (0 + 1) / 1]                                                                                

In [73]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

#assembler = VectorAssembler(inputCols=["duration","pathLength"],
#                       outputCol="features_to_scale")
#vectorized = assembler.transform(df)
#vectorized = vectorized.drop("duration","pathLength")

#vectorized.show(truncate=False)

In [83]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

from pyspark.ml.feature import VectorAssembler, MinMaxScaler, StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

# Linear Regression
regressor = LinearRegression(labelCol="label",featuresCol="features", regParam=0.1)

# (MLlib works only with one column vector)
# assembler combines a given list of columns into a single vector column
assembler = VectorAssembler(inputCols=["duration","pathLength"], outputCol="features_to_scale")

# Normalizing features
scaler = MinMaxScaler(inputCol="features_to_scale", outputCol="features")

# Inizializing pipeline
stages = [assembler]
stages.append(scaler)
stages.append(regressor)
pipeline = Pipeline().setStages(stages)

# Splitting dataset in train&test
train_set, test_set = df.randomSplit([0.8,0.2])

# Model training
pipeline_model = pipeline.fit(train_set)

# Make predictions
predictions = pipeline_model.transform(test_set)

IllegalArgumentException: features does not exist. Available: duration, pathLength, label

In [None]:
lr = LinearRegression(labelCol="label",featuresCol="features", maxIter=10, regParam=0.3, elasticNetParam=0.8)

assembler = VectorAssembler(inputCols=["duration","pathLength"],
                       outputCol="features_to_scale")
vectorized = assembler.transform(df)
vectorized = vectorized.drop("duration","pathLength")

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

In [79]:
# Show 10 example rows
predictions.select("prediction", "label", "features").show(10)
eval = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")
# R2
r2 = eval.evaluate(predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)
predictions.collect()

                                                                                

+------------------+-----+--------------------+
|        prediction|label|            features|
+------------------+-----+--------------------+
| 1.657228626999111|    3|[4.28742925741725...|
|1.6574934668429233|    1|[8.57485851483450...|
|1.6574934668429233|    1|[8.57485851483450...|
|1.6574934668429233|    1|[8.57485851483450...|
|1.6574934668429233|    3|[8.57485851483450...|
|1.6574934668429233|    5|[8.57485851483450...|
|1.6577583066867354|    1|[1.28622877722517...|
|1.6577583066867354|    5|[1.28622877722517...|
|1.6577583066867354|    5|[1.28622877722517...|
|1.7687695992137964|    5|[1.28622877722517...|
+------------------+-----+--------------------+
only showing top 10 rows



                                                                                

r2: 0.223


                                                                                

[Row(duration=3, pathLength=2, label=3, features_to_scale=DenseVector([3.0, 2.0]), features=DenseVector([0.0, 0.0]), prediction=1.657228626999111),
 Row(duration=4, pathLength=2, label=1, features_to_scale=DenseVector([4.0, 2.0]), features=DenseVector([0.0001, 0.0]), prediction=1.6574934668429233),
 Row(duration=4, pathLength=2, label=1, features_to_scale=DenseVector([4.0, 2.0]), features=DenseVector([0.0001, 0.0]), prediction=1.6574934668429233),
 Row(duration=4, pathLength=2, label=1, features_to_scale=DenseVector([4.0, 2.0]), features=DenseVector([0.0001, 0.0]), prediction=1.6574934668429233),
 Row(duration=4, pathLength=2, label=3, features_to_scale=DenseVector([4.0, 2.0]), features=DenseVector([0.0001, 0.0]), prediction=1.6574934668429233),
 Row(duration=4, pathLength=2, label=5, features_to_scale=DenseVector([4.0, 2.0]), features=DenseVector([0.0001, 0.0]), prediction=1.6574934668429233),
 Row(duration=5, pathLength=2, label=1, features_to_scale=DenseVector([5.0, 2.0]), features=

In [None]:
# Stop context
#spark.stop()