In [1]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
# spark = SparkSession.builder.appName("feat-eng").getOrCreate()
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("feat-eng") \
    .master("local[*]") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.ui.port", "4050") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/18 21:57:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/08/18 21:57:05 WARN Utils: Service 'SparkUI' could not bind on port 4050. Attempting port 4051.


In [3]:
# pip show pyspark


In [4]:
BASE_PATH = '../dataset/'
## DataFrame
anime_df_parsed = spark.read.csv(BASE_PATH + 'parsed_anime.csv', header=True, inferSchema=True)
anime_df_parsed.printSchema()

                                                                                

root
 |-- anime_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- type: string (nullable = true)
 |-- episodes: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- members: integer (nullable = true)
 |-- japanese_title: string (nullable = true)
 |-- aired: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- aired_from: string (nullable = true)
 |-- aired_to: integer (nullable = true)



In [5]:
# Multi-hot encode genres
from pyspark.ml.feature import StringIndexer

In [6]:
# split: Splits the genre column on commas. This converts the genre string into an array of genres.
# explode: Takes the array of genres and creates a new row for each genre. This results in one row per genre item.
# trim: Removes any leading or trailing whitespace

genre_transform_df = anime_df_parsed \
    .withColumn('genre_item', explode(split(col('genre'), '[,]'))) \
    .withColumn('genre_item', trim(col('genre_item')))
genre_transform_df.show(6)    ## change genre_item to array

+--------+--------------------+--------------------+-----+--------+------+-------+--------------------------+--------------------+--------------------+----------+----------+------------+
|anime_id|                name|               genre| type|episodes|rating|members|            japanese_title|               aired|           image_url|aired_from|  aired_to|  genre_item|
+--------+--------------------+--------------------+-----+--------+------+-------+--------------------------+--------------------+--------------------+----------+----------+------------+
|   32281|      Kimi no Na wa.|Drama, Romance, S...|Movie|       1|  9.37| 200630|                君の名は。|        Aug 26, 2016|https://cdn.myani...|1472140800|1472140800|       Drama|
|   32281|      Kimi no Na wa.|Drama, Romance, S...|Movie|       1|  9.37| 200630|                君の名は。|        Aug 26, 2016|https://cdn.myani...|1472140800|1472140800|     Romance|
|   32281|      Kimi no Na wa.|Drama, Romance, S...|Movie|       1|  9.37| 

In [19]:
genre_transform_df.select(['anime_id', 'genre_item']).show(10)

+--------+------------+
|anime_id|  genre_item|
+--------+------------+
|   32281|       Drama|
|   32281|     Romance|
|   32281|      School|
|   32281|Supernatural|
|    5114|      Action|
|    5114|   Adventure|
|    5114|       Drama|
|    5114|     Fantasy|
|    5114|       Magic|
|    5114|    Military|
+--------+------------+
only showing top 10 rows



In [39]:
## helps convert categorical string columns in a DataFrame into numerical indices
string_indexer = StringIndexer(inputCol='genre_item', outputCol='genre_index')
genre_indexed_df = string_indexer \
    .fit(genre_transform_df) \
    .transform(genre_transform_df) \
    .withColumn('genre_index', col('genre_index').cast('int'))
# genre_indexed_df    ## DataFrame => table collection of data

genre_indexed_df.select(['anime_id', 'genre_item', 'genre_index', 'rating']).show(10)
genre_indexed_df.columns  ## 14
# genre_indexed_df[['anime_id', 'genre_item', 'genre_index']].show(10)   ## change genre name to index: one hot

+--------+------------+-----------+------+
|anime_id|  genre_item|genre_index|rating|
+--------+------------+-----------+------+
|   32281|       Drama|          5|  9.37|
|   32281|     Romance|          8|  9.37|
|   32281|      School|          9|  9.37|
|   32281|Supernatural|         12|  9.37|
|    5114|      Action|          1|  9.26|
|    5114|   Adventure|          2|  9.26|
|    5114|       Drama|          5|  9.26|
|    5114|     Fantasy|          3|  9.26|
|    5114|       Magic|         16|  9.26|
|    5114|    Military|         23|  9.26|
+--------+------------+-----------+------+
only showing top 10 rows



['anime_id',
 'name',
 'genre',
 'type',
 'episodes',
 'rating',
 'members',
 'japanese_title',
 'aired',
 'image_url',
 'aired_from',
 'aired_to',
 'genre_item',
 'genre_index']

In [60]:
## DataFrame
pre_multihot_df = genre_indexed_df.groupby('anime_id') \
    .agg(collect_list('genre_index').alias('genre_indexes'),
         count('genre_index').alias('genre_count')  # Count of genres per anime_id
        )
# pre_multihot_df = genre_indexed_df.groupby('anime_id') \
#     .agg(collect_list('genre_index').alias('genre_indexes'))
    
## anime_id, genre_indexes
pre_multihot_df.show(10)
pre_multihot_df.columns   ## ['anime_id', 'genre_indexes']


+--------+--------------------+-----------+
|anime_id|       genre_indexes|genre_count|
+--------+--------------------+-----------+
|       1| [1, 2, 0, 5, 4, 25]|          6|
|       5|   [1, 5, 21, 4, 25]|          5|
|       6|           [1, 0, 4]|          3|
|       7|[1, 5, 16, 21, 32...|          6|
|       8|       [2, 3, 6, 12]|          4|
|      15|       [1, 0, 6, 20]|          4|
|      16|       [0, 5, 40, 8]|          4|
|      17|      [0, 6, 10, 20]|          4|
|      18|  [1, 37, 5, 19, 20]|          5|
|      19|[5, 26, 21, 32, 3...|          7|
+--------+--------------------+-----------+
only showing top 10 rows



['anime_id', 'genre_indexes', 'genre_count']

In [66]:
max_genre_index = genre_indexed_df \
    .agg(max('genre_index')).head()  ## .head() retrieves the first row of the result, which contains the maximum genre_index value.
print(max_genre_index)
max_genre_index = max_genre_index['max(genre_index)']
max_genre_index

# max_genre_index = genre_indexed_df \
#     .agg(max(col('genre_index'))).head()['max(genre_index)']

Row(max(genre_index)=42)


42

In [70]:
import numpy as np

@udf(returnType='array<int>') ## User Defined Function
def multihot_list(list, max_index):
    fill_array = np.zeros(max_index + 1, dtype=np.int32)
    for i in list:
        fill_array[i] = 1
    return fill_array.tolist()
    

multihot_df = pre_multihot_df \
    .withColumn(
        'genre_multihot',
        multihot_list(col('genre_indexes'), lit(max_genre_index))  ## lit() This tells PySpark to treat the literal value as a column.
    )

multihot_df.printSchema()

root
 |-- anime_id: integer (nullable = true)
 |-- genre_indexes: array (nullable = false)
 |    |-- element: integer (containsNull = false)
 |-- genre_count: long (nullable = false)
 |-- genre_multihot: array (nullable = true)
 |    |-- element: integer (containsNull = true)



In [39]:
multihot_df.show(10)

+--------+--------------------+--------------------+
|anime_id|       genre_indexes|      genre_multihot|
+--------+--------------------+--------------------+
|       1| [1, 2, 0, 5, 4, 25]|[1, 1, 1, 0, 1, 1...|
|       5|   [1, 5, 21, 4, 25]|[0, 1, 0, 0, 1, 1...|
|       6|           [1, 0, 4]|[1, 1, 0, 0, 1, 0...|
|       7|[1, 5, 16, 21, 32...|[0, 1, 0, 0, 0, 1...|
|       8|       [2, 3, 6, 12]|[0, 0, 1, 1, 0, 0...|
|      15|       [1, 0, 6, 20]|[1, 1, 0, 0, 0, 0...|
|      16|       [0, 5, 40, 8]|[1, 0, 0, 0, 0, 1...|
|      17|      [0, 6, 10, 20]|[1, 0, 0, 0, 0, 0...|
|      18|  [1, 37, 5, 19, 20]|[0, 1, 0, 0, 0, 1...|
|      19|[5, 26, 21, 32, 3...|[0, 0, 0, 0, 0, 1...|
+--------+--------------------+--------------------+
only showing top 10 rows



In [40]:
##################################################

In [77]:
# Average Rating Min-Max Scale
rating_df = spark.read.csv(BASE_PATH + '/rating.csv', header=True, inferSchema=True) \
    .filter(col('rating') > 0)

                                                                                

In [78]:
ave_rating_df = rating_df \
    .groupby('anime_id') \
    .agg(mean('rating').alias('ave_rating'))

ave_rating_df.show(10)



+--------+-----------------+
|anime_id|       ave_rating|
+--------+-----------------+
|   24171|7.386666666666667|
|    9465|8.098352214212152|
|   17679|7.293103448275862|
|    1829|7.341757827235005|
|    8086|7.939071817474721|
|   17389|8.601839684625492|
|   22097| 8.13076923076923|
|   30654|8.687342833193629|
|    5300|8.694010416666666|
|    6336|8.497902097902099|
+--------+-----------------+
only showing top 10 rows



                                                                                

In [79]:
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.ml import Pipeline

In [80]:
vec_assembler = VectorAssembler(inputCols=['ave_rating'], outputCol='ave_rating_vec')   ## change to spark vector
## the MinMaxScaler is used to  normalize data to a specific range, usually [0, 1]
ave_rating_scaler = MinMaxScaler(inputCol='ave_rating_vec', outputCol='ave_rating_scaled')
print(type(ave_rating_scaler))
pipeline = Pipeline(stages=[vec_assembler, ave_rating_scaler])

rating_scaled_df = pipeline \
    .fit(ave_rating_df) \
    .transform(ave_rating_df)

rating_scaled_df.printSchema()

<class 'pyspark.ml.feature.MinMaxScaler'>


                                                                                

root
 |-- anime_id: integer (nullable = true)
 |-- ave_rating: double (nullable = true)
 |-- ave_rating_vec: vector (nullable = true)
 |-- ave_rating_scaled: vector (nullable = true)



In [81]:
rating_scaled_df.show(10)



+--------+-----------------+-------------------+--------------------+
|anime_id|       ave_rating|     ave_rating_vec|   ave_rating_scaled|
+--------+-----------------+-------------------+--------------------+
|   24171|7.386666666666667|[7.386666666666667]|[0.7096296296296296]|
|    9465|8.098352214212152|[8.098352214212152]| [0.788705801579128]|
|   17679|7.293103448275862|[7.293103448275862]|[0.6992337164750958]|
|    1829|7.341757827235005|[7.341757827235005]|[0.7046397585816672]|
|    8086|7.939071817474721|[7.939071817474721]|[0.7710079797194134]|
|   17389|8.601839684625492|[8.601839684625492]|[0.8446488538472768]|
|   22097| 8.13076923076923| [8.13076923076923]|[0.7923076923076922]|
|   30654|8.687342833193629|[8.687342833193629]|[0.8541492036881809]|
|    5300|8.694010416666666|[8.694010416666666]|[0.8548900462962962]|
|    6336|8.497902097902099|[8.497902097902099]|[0.8331002331002332]|
+--------+-----------------+-------------------+--------------------+
only showing top 10 

                                                                                

In [90]:
@udf(returnType='float')
def unwrap_list(rating):
    rating = rating.toArray()
    print(rating[0])
    return rating.tolist()[0] 
    # return rating.toArray().tolist()[0]    ## spark vector to numpy array
    ## .tolist() converts this numpy array into a Python list.
    
rating_scaled_df = rating_scaled_df \
    .withColumn('ave_rating_minmax', unwrap_list(col('ave_rating_scaled')))

rating_scaled_df.show(10)


# Creating a NumPy array
arr = np.array([0.75,2])
# Accessing the first element directly
first_element = arr[1]
print(first_element)  # Output: 2



+--------+-----------------+-------------------+--------------------+-----------------+
|anime_id|       ave_rating|     ave_rating_vec|   ave_rating_scaled|ave_rating_minmax|
+--------+-----------------+-------------------+--------------------+-----------------+
|   24171|7.386666666666667|[7.386666666666667]|[0.7096296296296296]|       0.70962965|
|    9465|8.098352214212152|[8.098352214212152]| [0.788705801579128]|        0.7887058|
|   17679|7.293103448275862|[7.293103448275862]|[0.6992337164750958]|        0.6992337|
|    1829|7.341757827235005|[7.341757827235005]|[0.7046397585816672]|       0.70463973|
|    8086|7.939071817474721|[7.939071817474721]|[0.7710079797194134]|       0.77100796|
|   17389|8.601839684625492|[8.601839684625492]|[0.8446488538472768]|       0.84464884|
|   22097| 8.13076923076923| [8.13076923076923]|[0.7923076923076922]|        0.7923077|
|   30654|8.687342833193629|[8.687342833193629]|[0.8541492036881809]|        0.8541492|
|    5300|8.694010416666666|[8.6

0.7096296296296296
0.788705801579128
0.6992337164750958
0.7046397585816672
0.7710079797194134
0.8446488538472768
0.7923076923076922
0.8541492036881809
0.8548900462962962
0.8331002331002332
0.733479009402728
                                                                                

In [94]:
rating_result_df = rating_scaled_df \
    .select(['anime_id', 'ave_rating_minmax'])
result_df = anime_df_parsed \
    .join(rating_result_df, on='anime_id')   ## join two dataframe

In [95]:
result_df.printSchema()


root
 |-- anime_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- type: string (nullable = true)
 |-- episodes: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- members: integer (nullable = true)
 |-- japanese_title: string (nullable = true)
 |-- aired: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- aired_from: string (nullable = true)
 |-- aired_to: integer (nullable = true)
 |-- ave_rating_minmax: float (nullable = true)



In [96]:
result_df.show(10)

24/08/12 00:03:39 ERROR RetryingBlockTransferor: Exception while beginning fetch of 1 outstanding blocks 
java.io.IOException: Failed to connect to wendys-laptop.lan/192.168.1.165:56382
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:294)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:214)
	at org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:131)
	at org.apache.spark.network.shuffle.RetryingBlockTransferor.transferAllOutstanding(RetryingBlockTransferor.java:173)
	at org.apache.spark.network.shuffle.RetryingBlockTransferor.start(RetryingBlockTransferor.java:152)
	at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:151)
	at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:102)
	at org.apache.spark.storage.BlockManager.fetchRemoteMana

Py4JJavaError: An error occurred while calling o919.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 117.0 failed 1 times, most recent failure: Lost task 0.0 in stage 117.0 (TID 186) (wendys-laptop.lan executor driver): TaskResultLost (result lost from block manager)
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:455)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:140)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:224)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:219)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
