# Import

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark_session = SparkSession.builder.getOrCreate()

In [None]:
from pyspark.sql.functions import *

# Read Data

In [None]:
items = spark_session.read.option("inferSchema","true").csv("items.csv", header=True, sep="|")

In [None]:
# items.show(3)

# Data Preprocessing

In [None]:
# items.schema

In [None]:
items = items.withColumn("subtopics",translate(items["subtopics"],"[",""))
items = items.withColumn("subtopics",translate(items["subtopics"],"]",""))

### Delete Item 62676

In [None]:
# items.select([count(when(isnull('main topic'), True))]).show()
# items.select([count(when(isnull('subtopics'), True))]).show()
# items.select([count(when(isnull('subtopics') & isnull('main topic'), True))]).show()
# items.select([count(when(items["subtopics"] == "", True))]).show()

In [None]:
# items.where((isnull('subtopics') & isnull('main topic'))).show()

In [None]:
# items.where("itemID = 62676").show()

In [None]:
items = items.filter(items.itemID != 62676)

In [None]:
# items.where((isnull('subtopics') & isnull('main topic'))).show()

In [None]:
# items.show(3)

### Combine Main Topic and Subtopics

In [None]:
items = items.withColumn(
    "topics", 
    when(isnull("main topic"), items["subtopics"]).
    when(items["subtopics"] == "", items["main topic"]).
    otherwise(concat(col("main topic"), lit(","), col("subtopics")))
) 

In [None]:
# items.show(3)
# items.where(isnull('main topic')).show(5)
# items.where(items["subtopics"] == "").show(3)

In [None]:
items = items.select("itemID","title","author","publisher","topics")

In [None]:
# items.show(3)

In [None]:
items = items.withColumn(
    'topics', array_distinct(split(col("topics"),","))
)

In [None]:
# items.select("topics").show(5, False)
# items.show(3)

### Explode into separate rows

In [None]:
from pyspark.sql.functions import explode

In [None]:
items = items.withColumn("topics_splitted", explode(items.topics))

In [None]:
# items.show(5)

In [None]:
items = items.drop("topics")

In [None]:
# items.show(3)

### Pivot

In [None]:
from pyspark.sql.functions import sum

In [None]:
spark_session.conf.set("spark.sql.pivotMaxValues",25000)

In [None]:
pivot = items.groupBy("itemID").pivot("topics_splitted").count()

In [None]:
pivot = pivot.fillna(0)

In [None]:
# pivot.show()

### Dimensionality Reduction

In [None]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix

In [None]:
mat = pivot.drop("itemID").rdd.map(lambda s : Vectors.dense(s))

In [None]:
mat = RowMatrix(mat)

### Principal component analysis (PCA)

In [None]:
pca = mat.computePrincipalComponents(5)

In [None]:
projected = mat.multiply(pca)

In [None]:
# print(projected.rows.collect())