In [1]:
import pandas as pd
from pyspark.sql import functions as F


spark = SparkSession.builder.getOrCreate()

In [50]:
# load modules
from pyspark.sql import SparkSession
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.feature import VectorAssembler 
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.evaluation import MulticlassMetrics

import os

In [2]:
# read text file into pyspark dataframe
filename = 'top50.csv'
df = spark.read.csv(filename,  inferSchema=True, header = True)

In [4]:
df.show(2)

+---+----------+------------+--------------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+
|_c0|Track.Name| Artist.Name|         Genre|Beats.Per.Minute|Energy|Danceability|Loudness..dB..|Liveness|Valence.|Length.|Acousticness..|Speechiness.|Popularity|
+---+----------+------------+--------------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+
|  1|  Se�orita|Shawn Mendes|  canadian pop|             117|    55|          76|            -6|       8|      75|    191|             4|           3|        79|
|  2|     China|    Anuel AA|reggaeton flow|             105|    81|          79|            -4|       8|      61|    302|             8|           9|        92|
+---+----------+------------+--------------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+
only showing top 2 rows



In [5]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

In [34]:
df_feats = df.columns
df_feats

['_c0',
 'Track.Name',
 'Artist.Name',
 'Genre',
 'Beats.Per.Minute',
 'Energy',
 'Danceability',
 'Loudness..dB..',
 'Liveness',
 'Valence.',
 'Length.',
 'Acousticness..',
 'Speechiness.',
 'Popularity']

In [37]:
df2 = df.withColumnRenamed("Beats.Per.Minute","BPM") \
    .withColumnRenamed("Loudness..dB..","Loudness")\
    .withColumnRenamed("Valence.","Valence")\
    .withColumnRenamed("Length.","Length")\
    .withColumnRenamed("Acousticness..","Acousticness")\
    .withColumnRenamed("Speechiness.","Speechiness")

In [31]:
vecAssembler = VectorAssembler(outputCol = 'features')
vecAssembler.setInputCols([
                         'BPM',
 'Energy',
 'Danceability',
 'Loudness',
 'Liveness',
 'Valence',
 'Length',
 'Acousticness',
 'Speechiness',
 'Popularity'])

VectorAssembler_d4c49c3b3ed6

In [39]:
transformed =vecAssembler.transform(df2)

In [40]:
transformed.show(2)

+---+----------+------------+--------------+---+------+------------+--------+--------+-------+------+------------+-----------+----------+--------------------+
|_c0|Track.Name| Artist.Name|         Genre|BPM|Energy|Danceability|Loudness|Liveness|Valence|Length|Acousticness|Speechiness|Popularity|            features|
+---+----------+------------+--------------+---+------+------------+--------+--------+-------+------+------------+-----------+----------+--------------------+
|  1|  Se�orita|Shawn Mendes|  canadian pop|117|    55|          76|      -6|       8|     75|   191|           4|          3|        79|[55.0,76.0,8.0,75...|
|  2|     China|    Anuel AA|reggaeton flow|105|    81|          79|      -4|       8|     61|   302|           8|          9|        92|[81.0,79.0,8.0,61...|
+---+----------+------------+--------------+---+------+------------+--------+--------+-------+------+------------+-----------+----------+--------------------+
only showing top 2 rows



In [41]:
from pyspark.ml.feature import StandardScaler
Scalerizer = StandardScaler().setInputCol("features").setOutputCol("scaled_features")
transformed = Scalerizer.fit(transformed).transform(transformed)

In [42]:
dataRdd = transformed.select("Genre", "scaled_features").rdd.map(tuple)

In [43]:
dataRdd.take(2)

[('canadian pop', DenseVector([3.8646, 6.3706, 0.7195, 3.3578, 17.5888])),
 ('reggaeton flow', DenseVector([5.6914, 6.622, 0.7195, 2.731, 20.4832]))]

In [44]:
lp = dataRdd.map(lambda row:(1 if row[0]=='M' else 0, Vectors.dense(row[1])))    \
                    .map(lambda row: LabeledPoint(row[0], row[1]))

In [45]:
training, test = lp.randomSplit([0.7, 0.3], seed = 314)

In [49]:
model = LogisticRegressionWithLBFGS.train(training)

NameError: name 'LogisticRegressionWithLBFGS' is not defined