In [1]:
import pyspark
spark = (pyspark
        .sql
        .SparkSession.builder
        .master("local[*]")
        .appName("Recomendador")
        .config("spark.driver.memory", "4g")
        .config("spark.executor.memory", "8g")
        .getOrCreate()
)
spark

In [6]:
rawUserArtistData = spark.read.text("ds/user_artist_data.txt")
rawUserArtistData.show(5)

+-------------------+
|              value|
+-------------------+
|       1000002 1 55|
| 1000002 1000006 33|
|  1000002 1000007 8|
|1000002 1000009 144|
|1000002 1000010 314|
+-------------------+
only showing top 5 rows



In [16]:
from pyspark.sql.functions import split

split_col = split(rawUserArtistData.value, " ")

userArtistDf = (rawUserArtistData
                .withColumn("userid", split_col.getItem(0))
                .withColumn("artistid", split_col.getItem(1))
                .withColumn("playcount", split_col.getItem(2))
                .drop(rawUserArtistData.value)
               )
userArtistDf.show(5)

+-------+--------+---------+
| userid|artistid|playcount|
+-------+--------+---------+
|1000002|       1|       55|
|1000002| 1000006|       33|
|1000002| 1000007|        8|
|1000002| 1000009|      144|
|1000002| 1000010|      314|
+-------+--------+---------+
only showing top 5 rows



In [19]:
rawArtistData = spark.read.text("ds/artist_data.txt")
split_col = split(rawArtistData.value, '\t')
artistData = (rawArtistData
.withColumn("artistid", split_col.getItem(0))
.withColumn("artist_name", split_col.getItem(1))
.drop("value"))
artistData.show(5)

+--------+--------------------+
|artistid|         artist_name|
+--------+--------------------+
| 1134999|        06Crazy Life|
| 6821360|        Pang Nakarin|
|10113088|Terfel, Bartoli- ...|
|10151459| The Flaming Sidebur|
| 6826647|   Bodenstandig 3000|
+--------+--------------------+
only showing top 5 rows



In [22]:
rawArtistAlias = spark.read.text("ds/artist_alias.txt")
split_col = split(rawArtistAlias.value, '\t')
artistAlias = (rawArtistAlias
               .withColumn("badid", split_col.getItem(0))
               .withColumn("goodid", split_col.getItem(1))
               .drop("value")
)
artistAlias.show(5)

+--------+-------+
|   badid| goodid|
+--------+-------+
| 1092764|1000311|
| 1095122|1000557|
| 6708070|1007267|
|10088054|1042317|
| 1195917|1042317|
+--------+-------+
only showing top 5 rows



In [24]:
artistData.filter(artistData["artistid"].isin("1092764", "1000311")).show()

+--------+--------------+
|artistid|   artist_name|
+--------+--------------+
| 1000311| Steve Winwood|
| 1092764|Winwood, Steve|
+--------+--------------+



In [25]:
userArtistDf.count()

24296858

In [29]:
artistAlias.select("goodid").distinct().count()

22478

In [32]:
from pyspark.sql.functions import count,avg

In [39]:
(userArtistDf
 .groupBy("userid")
 .agg(count("artistid").alias("artistCount"))
 .agg(avg("artistCount").alias("Average Artists per User"))
).show(5)

+------------------------+
|Average Artists per User|
+------------------------+
|      164.04492576513562|
+------------------------+



Para a próxima aula: 
- Realizar o .join entre artistData e artistAlias para manter apenas os goodid's.
- Realizar o .join entre userArtistDf e artistAlias para manter apenas os goodid's.