<a href="https://colab.research.google.com/github/pawara101/pyspark-trials/blob/main/pySpark_tutorial_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
! pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=61f6f2116b98f97759dd42bc0505c7d211cc6aa7f06d14b7d8325896b57c7255
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ML_pipeline_3").getOrCreate()

In [3]:
data = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/pyspark/tcc_ceds_music.csv', header=True)

In [4]:
data.show()

+---+--------------------+--------------------+------------+-----+--------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+----------+------------------+
|_c0|         artist_name|          track_name|release_date|genre|              lyrics|len|              dating|            violence|          world/life|          night/time|  shake the audience|       family/gospel|            romantic|       communication|             obscene|               music|     movement/places|light/visual perceptions|    family/spiritual|          like/girls|             sadness|       

In [5]:
## Encode genre values
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol='genre', outputCol='label')
indexer_fitted = indexer.fit(data)
data = indexer_fitted.transform(data)
data.show()

+---+--------------------+--------------------+------------+-----+--------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+----------+------------------+-----+
|_c0|         artist_name|          track_name|release_date|genre|              lyrics|len|              dating|            violence|          world/life|          night/time|  shake the audience|       family/gospel|            romantic|       communication|             obscene|               music|     movement/places|light/visual perceptions|    family/spiritual|          like/girls|             sadness| 

In [6]:
data.select('topic').distinct().collect()

[Row(topic='romantic'),
 Row(topic='music'),
 Row(topic='violence'),
 Row(topic='feelings'),
 Row(topic='sadness'),
 Row(topic='night/time'),
 Row(topic='obscene'),
 Row(topic='world/life')]

In [7]:
## Encode topics
indexer = StringIndexer(inputCol='topic', outputCol='topic_enc')
indexer_fitted = indexer.fit(data)
data = indexer_fitted.transform(data)
data.show()

+---+--------------------+--------------------+------------+-----+--------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+----------+------------------+-----+---------+
|_c0|         artist_name|          track_name|release_date|genre|              lyrics|len|              dating|            violence|          world/life|          night/time|  shake the audience|       family/gospel|            romantic|       communication|             obscene|               music|     movement/places|light/visual perceptions|    family/spiritual|          like/girls|            

In [8]:
data.select('artist_name').distinct().count()

5426

In [9]:
data.dtypes

[('_c0', 'string'),
 ('artist_name', 'string'),
 ('track_name', 'string'),
 ('release_date', 'string'),
 ('genre', 'string'),
 ('lyrics', 'string'),
 ('len', 'string'),
 ('dating', 'string'),
 ('violence', 'string'),
 ('world/life', 'string'),
 ('night/time', 'string'),
 ('shake the audience', 'string'),
 ('family/gospel', 'string'),
 ('romantic', 'string'),
 ('communication', 'string'),
 ('obscene', 'string'),
 ('music', 'string'),
 ('movement/places', 'string'),
 ('light/visual perceptions', 'string'),
 ('family/spiritual', 'string'),
 ('like/girls', 'string'),
 ('sadness', 'string'),
 ('feelings', 'string'),
 ('danceability', 'string'),
 ('loudness', 'string'),
 ('acousticness', 'string'),
 ('instrumentalness', 'string'),
 ('valence', 'string'),
 ('energy', 'string'),
 ('topic', 'string'),
 ('age', 'string'),
 ('label', 'double'),
 ('topic_enc', 'double')]

In [10]:
## Stage01
from pyspark.ml.feature import Tokenizer, RegexTokenizer
tokenizer = Tokenizer(inputCol="lyrics", outputCol="lyrics_token")
tokenized_data = tokenizer.transform(data)

In [11]:
tokenized_data.show()

+---+--------------------+--------------------+------------+-----+--------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+----------+------------------+-----+---------+--------------------+
|_c0|         artist_name|          track_name|release_date|genre|              lyrics|len|              dating|            violence|          world/life|          night/time|  shake the audience|       family/gospel|            romantic|       communication|             obscene|               music|     movement/places|light/visual perceptions|    family/spiritual|          li

In [12]:
## Word2Vec
from pyspark.ml.feature import Word2Vec
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="lyrics_token", outputCol="lyric_values")
model = word2Vec.fit(tokenized_data)
word2vec_data = model.transform(tokenized_data)

In [13]:
word2vec_data.show()

+---+--------------------+--------------------+------------+-----+--------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+----------+------------------+-----+---------+--------------------+--------------------+
|_c0|         artist_name|          track_name|release_date|genre|              lyrics|len|              dating|            violence|          world/life|          night/time|  shake the audience|       family/gospel|            romantic|       communication|             obscene|               music|     movement/places|light/visual perceptions|    family/s

In [14]:
word2vec_data.dtypes

[('_c0', 'string'),
 ('artist_name', 'string'),
 ('track_name', 'string'),
 ('release_date', 'string'),
 ('genre', 'string'),
 ('lyrics', 'string'),
 ('len', 'string'),
 ('dating', 'string'),
 ('violence', 'string'),
 ('world/life', 'string'),
 ('night/time', 'string'),
 ('shake the audience', 'string'),
 ('family/gospel', 'string'),
 ('romantic', 'string'),
 ('communication', 'string'),
 ('obscene', 'string'),
 ('music', 'string'),
 ('movement/places', 'string'),
 ('light/visual perceptions', 'string'),
 ('family/spiritual', 'string'),
 ('like/girls', 'string'),
 ('sadness', 'string'),
 ('feelings', 'string'),
 ('danceability', 'string'),
 ('loudness', 'string'),
 ('acousticness', 'string'),
 ('instrumentalness', 'string'),
 ('valence', 'string'),
 ('energy', 'string'),
 ('topic', 'string'),
 ('age', 'string'),
 ('label', 'double'),
 ('topic_enc', 'double'),
 ('lyrics_token', 'array<string>'),
 ('lyric_values', 'vector')]

In [15]:
#convert  label column datatype to integer
word2vec_data = word2vec_data.withColumn("danceability",word2vec_data.danceability.cast('double'))

In [16]:
### Vector assembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["lyric_values", "topic_enc","danceability"],
    outputCol="features")

output = assembler.transform(word2vec_data)

In [17]:
output.show(1)

+---+-----------+--------------------+------------+-----+--------------------+---+--------------------+-------------------+--------------------+--------------------+--------------------+-------------------+------------------+------------------+--------------------+-------------------+--------------------+------------------------+--------------------+--------------------+-----------------+-------------------+-------------------+-------------------+------------------+------------------+-------------------+-------------------+-------+---+-----+---------+--------------------+--------------------+--------------------+
|_c0|artist_name|          track_name|release_date|genre|              lyrics|len|              dating|           violence|          world/life|          night/time|  shake the audience|      family/gospel|          romantic|     communication|             obscene|              music|     movement/places|light/visual perceptions|    family/spiritual|          like/girls|      

In [18]:
output.head()['features']

DenseVector([0.2108, 0.2063, -0.0596, 0.0, 0.3577])

In [39]:
#convert  label column datatype to integer
output = output.withColumn("label",output.label.cast('int'))

In [40]:
output.dtypes

[('_c0', 'string'),
 ('artist_name', 'string'),
 ('track_name', 'string'),
 ('release_date', 'string'),
 ('genre', 'string'),
 ('lyrics', 'string'),
 ('len', 'string'),
 ('dating', 'string'),
 ('violence', 'string'),
 ('world/life', 'string'),
 ('night/time', 'string'),
 ('shake the audience', 'string'),
 ('family/gospel', 'string'),
 ('romantic', 'string'),
 ('communication', 'string'),
 ('obscene', 'string'),
 ('music', 'string'),
 ('movement/places', 'string'),
 ('light/visual perceptions', 'string'),
 ('family/spiritual', 'string'),
 ('like/girls', 'string'),
 ('sadness', 'string'),
 ('feelings', 'string'),
 ('danceability', 'double'),
 ('loudness', 'string'),
 ('acousticness', 'string'),
 ('instrumentalness', 'string'),
 ('valence', 'string'),
 ('energy', 'string'),
 ('topic', 'string'),
 ('age', 'string'),
 ('label', 'int'),
 ('topic_enc', 'double'),
 ('lyrics_token', 'array<string>'),
 ('lyric_values', 'vector'),
 ('features', 'vector')]

In [20]:
train,test = output.randomSplit([0.8, 0.2])

In [42]:
## Linear regression
from pyspark.ml.classification import LogisticRegression
LR = LogisticRegression(featuresCol = 'features', labelCol='label', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = LR.fit(train)

In [22]:
lr_model

LinearRegressionModel: uid=LinearRegression_dd6413f56a48, numFeatures=5

In [23]:
trainingSummary = lr_model.summary
trainingSummary

<pyspark.ml.regression.LinearRegressionTrainingSummary at 0x7f5d1844a7a0>

In [24]:
predictions = lr_model.transform(test)

In [25]:
predictions

DataFrame[_c0: string, artist_name: string, track_name: string, release_date: string, genre: string, lyrics: string, len: string, dating: string, violence: string, world/life: string, night/time: string, shake the audience: string, family/gospel: string, romantic: string, communication: string, obscene: string, music: string, movement/places: string, light/visual perceptions: string, family/spiritual: string, like/girls: string, sadness: string, feelings: string, danceability: double, loudness: string, acousticness: string, instrumentalness: string, valence: string, energy: string, topic: string, age: string, label: int, topic_enc: double, lyrics_token: array<string>, lyric_values: vector, features: vector, prediction: double]

In [26]:
predictions.select('label','features','prediction').show()

+-----+--------------------+------------------+
|label|            features|        prediction|
+-----+--------------------+------------------+
|    0|[0.21079468721229...| 2.007825219438064|
|    0|[0.17419333824956...| 2.179118395633684|
|    0|[-0.2041475899929...|2.0534297223833082|
|    0|[0.22504331804686...|2.0290859240659036|
|    0|[0.25109757013807...|1.9441437666888928|
|    0|[0.25304728383631...|2.0167437741306693|
|    0|[0.12852056049520...|2.1455699823405614|
|    0|[0.19742831587791...| 2.107509628233444|
|    0|[-0.1200070958972...|1.9832468152419325|
|    0|[0.18378049411943...|1.9898893924260799|
|    0|[0.08829297942499...| 2.113023249943736|
|    0|[0.26060245206786...| 1.946365000714981|
|    0|[0.06222542249323...| 2.234649279134645|
|    0|[0.22975372511780...|2.0615139679304164|
|    0|[0.36593972773213...|1.9930907236140043|
|    0|[0.21442885021013...|1.9688703236808387|
|    0|[0.21207200378483...|2.1734960261831215|
|    0|[0.27991349115429...| 2.158747167

In [27]:
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count())
print("Accuracy : ",accuracy)

Accuracy :  0.0


## ML PipeLine

In [28]:
data = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/pyspark/tcc_ceds_music.csv', header=True)

In [43]:
## Stage_01
Stage01 = StringIndexer(inputCol='topic', outputCol='topic_enc')

Stage02 = StringIndexer(inputCol='genre', outputCol='label')
## Stage_02
Stage03 = Tokenizer(inputCol="lyrics", outputCol="lyrics_token")
## Stage_03
Stage04 = Word2Vec(vectorSize=3, minCount=0, inputCol="lyrics_token", outputCol="lyric_values")
## Stage 04
Stage05 = VectorAssembler(
    inputCols=["lyric_values", "topic_enc"],
    outputCol="features")
## Stage 05
from pyspark.ml.classification import LogisticRegression
Stage06 = LogisticRegression(featuresCol = 'features', labelCol='label')

from pyspark.ml import Pipeline

# setup the pipeline
regression_pipeline = Pipeline(stages= [Stage01, Stage02, Stage03, Stage04, Stage05,Stage06])

# fit the pipeline for the trainind data
model = regression_pipeline.fit(data)
# transform the data
sample_data_train = model.transform(data)

In [46]:
sample_data_train.show()

+---+--------------------+--------------------+------------+-----+--------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+----------+------------------+---------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|_c0|         artist_name|          track_name|release_date|genre|              lyrics|len|              dating|            violence|          world/life|          night/time|  shake the audience|       family/gospel|            romantic|       communication|             obscene|     

In [51]:
# Evaluate model performance using accuracy
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(sample_data_train)

# Print accuracy
print("Accuracy: {:.2f}".format(accuracy*100))

Accuracy: 25.59
