# Proiect Data Engineering
_____

### Echipa

1. Benchia Madalin Codrin - 406 IS
2. Coteanu Sebastian - 406 IS
3. Coteanu Vlad - 406 IS
4. Dimisca Cosmin - 406 IS
5. Ghinea Alexandru Stefan - 407 IA
6. Giosanu Andrei - 406 IS
7. Nedelea Cosmin Daniel - 507 IA
8. Petrescu Ana-Maria - 406 IS

_____

### Setul de date

Setul de date folosit se poate gasi la adresa: https://www.kaggle.com/lava18/google-play-store-apps.
Setul contine informatii legate de aplicatii Android hostate pe Google App Store si se imparte in doua
fisiere CSV, unul cu informatii statistice ale aplicatiilor si unul cu cele mai relevante 100 comentarii
lasate de utilizatori.

_____

### Descriere proiect

Proiectul curent prelucreaza setul de date si foloseste trei modele, doua pentru clasificare si unul 
pentru regresie. 95% din date sunt folosite la antrenarea si testarea modelelor, iar 5% sunt pastrate
pentru partea de Streaming. La finalul procesului de antrenare, unul dintre modele este salvat pentru
a fi folosit, si el, in procesul de Streaming.

In [4]:
# IMPORT USED LIBRARIES

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer
from datetime import datetime
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.mllib.evaluation import MulticlassMetrics
from sklearn.metrics import confusion_matrix
from pyspark.ml.feature import Normalizer

# Create an unique key used for file saving and create a Spark Context

now = datetime.now()
unique_key = now.strftime("%m-%d-%Y-%H-%M-%S")
spark = SparkSession.builder.appName('google-play-store').getOrCreate()

In [None]:
# Read datasets from CSV files and analyze columns
# Main file
df = spark.read.csv('Data/googleplaystore.csv',inferSchema=True,header=True)
df.printSchema()
df.cache()

# Reviews file
df2 = spark.read.csv('Data/googleplaystore_user_reviews.csv',inferSchema=True,header=True)
df2.printSchema()
df2.cache()


### Vom clasifica dupa feature-ul Installs si vom face o regresie dupa feature-ul Rating 

In [None]:
# Observe some data from both files
df.show(5)
df2.show(5)

In [7]:
# Use data from review file and add it to the main dataframe
# Filter bad data
df2 = df2.filter(df2["Sentiment"] != "nan")

# Merge Potential Score and Sentiment Polarity in a single column which should 
# be more influent when a review is closer to 0.5 subjectivity
df2 = df2.withColumn("Potential Score", (0.5 + 0.5 * df2["Sentiment_Polarity"] * (1.5 - df2["Sentiment_Subjectivity"])))
df2 = df2.groupBy("App").agg({"Potential Score" : "avg"}).withColumnRenamed("avg(Potential Score)", "avg score")
df = df.join(df2, df.App == df2.App).drop("App")

### Curatam datele


In [None]:
# SIZE feature
# Size data comes as: xK (KB), xM (MB)  or 'Varies with device'
# If in xK format then it should be converted in MB
# If in xM format then it should be parsed to x, default unit of measure should be M
# If in Varies with device format then it should be parsed to the avg of their Category

df_temp = df
df_temp = df_temp.withColumn("Size uom", F.expr("substring(Size, length(Size), 1)"))

df_temp = df_temp.withColumn("Size parsed", F.when(df_temp["Size uom"] == "k", \
    F.expr("substring(Size, 1, length(Size) - 1)").cast('double') / 1024))

df_temp = df_temp.withColumn("Size parsed", F.when(df_temp["Size uom"] == "M", \
    F.expr("substring(Size, 1, length(Size) - 1)").cast('double')).otherwise(df_temp['Size parsed']))

df_temp_avg_per_cat = df_temp.filter(df_temp["Size uom"] != "e").groupBy("Category").agg({"Size parsed" : "avg"}) \
                             .withColumnRenamed("avg(Size parsed)", "avg_size") \
                             .withColumnRenamed("Category", "Category for avg")

df_temp = df_temp.join(df_temp_avg_per_cat, df_temp_avg_per_cat["Category for avg"] == df_temp["Category"])

df_temp = df_temp.withColumn("Size parsed", F.when(df_temp["Size uom"] == "e", \
    df_temp["avg_size"].cast('double')).otherwise(df_temp['Size parsed']))

df_temp = df_temp.withColumn("Size parsed", F.round(df_temp["Size parsed"], 2))

df = df_temp.drop("Size", "Size uom", "Category for avg", "avg_size")

In [10]:
# Price feature might have $ sign at the beggining for not null values

df = df.withColumn("Price", F.when(df["Price"] != "0", F.expr("substring(Price, 2, length(Price) - 1)").cast('double')) \
        .otherwise(df_temp['Price']).cast('double'))

In [11]:
# Rating feature - fill NaN ratings from Rating with 2.5 avg val

df = df.withColumn("Rating", F.when(df["Rating"] == "NaN", F.lit("2.5")).otherwise(df['Rating']))

In [12]:
# Installs cols which we want to classify by has many close values
# So I will convert values over 10 to 10 and also add an upper roof of 100,000,000

df = df.withColumn("Installs", F.when(df["Installs"] == "0", F.lit("10+")).otherwise(df["Installs"]))
df = df.withColumn("Installs", F.when(F.expr('substring(Installs, 1, length(Installs) - 1)').cast('double') < 100, F.lit("10+")).otherwise(df["Installs"]))
df = df.withColumn("Installs", F.when(F.expr('substring(Installs, 1, length(Installs) - 1)').cast('double') > 100000000, F.lit("100000000+")).otherwise(df["Installs"]))
df = df.withColumn("Installs", F.expr('substring(Installs, 1, length(Installs) - 1)'))

### Pregatim datele pentru antrenare

In [None]:
# CAST used numeric features to double values

features_to_cast_to_double = [ \
    'Rating', \
    'Reviews', \
    'Price', \
    'avg score'
]
for feature in features_to_cast_to_double:
    print("Casting " + feature + " to double")
    df = df.withColumn(feature, df[feature].cast('double'))

In [None]:
# INDEXING categorical features

features_to_index = [
    'Category', \
    'Type', \
    'Content Rating', \
    'Genres', \
    'Android Ver', \
    'Installs'
]

df_indexed = df
df_indexed.cache()
for feature in features_to_index:
    print("Indexing: " + feature)
    indexer =  StringIndexer(inputCol=feature, outputCol=(feature + " indexed"))
    df_indexed = indexer.fit(df_indexed).transform(df_indexed)
    df_indexed = df_indexed.drop(feature)

In [None]:
# DROP unused columns

featres_to_drop = [
    'App', \
    'Last Updated', \
    'Current Ver',
    'Android Ver indexed'
]

for feature in featres_to_drop:
    print("Dropping: " + feature)
    df_indexed = df_indexed.drop(feature)

In [None]:
# Splitting data into train test data and streaming data

train_test_data, streaming_data = df_indexed.randomSplit([0.95, 0.05])


### Vectorizam datele si le clasificam folosind doi clasificatori
1. Un RandomForestClassifier, pe care il vom si salva pentru a fi folosit mai tarziu
2. Un MultilayerPerceptronClassifier



In [None]:
# Vectorizare

feature_cols = train_test_data.columns
feature_cols.remove('Installs indexed')

assembler = VectorAssembler(inputCols = feature_cols, outputCol = "features", handleInvalid = "error")
pipeline = Pipeline(stages=[assembler])
outputModel = pipeline.fit(train_test_data)
output = outputModel.transform(train_test_data)
final_data = output.select("features", "Installs indexed")

train_data, test_data = final_data.randomSplit([0.7, 0.3])

In [None]:
# Random forest classifier

rf = RandomForestClassifier(labelCol="Installs indexed", featuresCol="features", numTrees=32, maxBins=120)
model = rf.fit(train_data)

predictions = model.transform(test_data)
evaluator = MulticlassClassificationEvaluator(
    labelCol="Installs indexed", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Random forest test Error = %g" % (1.0 - accuracy))
randomForestError = (1.0 - accuracy)

# Save just the random forest model
print("Saving the model")
strErr = str(round(randomForestError, 2))
model.save("OutputGStore\\" + str(unique_key) + "-" + strErr + "-randomForestModel")

In [None]:
# MultilayerPerceptronClassifier

layers = [len(feature_cols), 25, 10]
trainer = MultilayerPerceptronClassifier(layers=layers, labelCol="Installs indexed", featuresCol="features", blockSize=500, seed=10003, maxIter=1000)
model = trainer.fit(train_data)
result = model.transform(test_data)
evaluator = MulticlassClassificationEvaluator(
    labelCol="Installs indexed", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(result)
print("Perceptron test Error = %g" % (1.0 - accuracy))
deepLearningError = (1.0 - accuracy)

# Create a confussion matrix for evaluating the quality of prediction
temp = result.select("Installs indexed", "prediction")
actual = [int(row['Installs indexed']) for row in temp.collect()]
predicted = [int(row['prediction']) for row in temp.collect()]
conf = confusion_matrix(actual, predicted)
print(conf)


### Vectorizam datele si facem o regresie pe baza Rating-ului folosind un LinearRegression



In [None]:
feature_cols = ['avg score', 'Category indexed', 'Genres indexed', 'Installs indexed', \
     'Size parsed', 'Price', 'Type indexed', 'Reviews']

assembler = VectorAssembler(inputCols = feature_cols, outputCol = "features", handleInvalid = "error")
pipeline = Pipeline(stages=[assembler])
outputModel = pipeline.fit(train_test_data)
output = outputModel.transform(train_test_data)
final_data = output.select("features", "Rating")

train_data, test_data = final_data.randomSplit([0.7, 0.3])

lr = LinearRegression(featuresCol = "features", labelCol='Rating', regParam=0.1)
lrModel = lr.fit(train_data)
test_results = lrModel.evaluate(test_data)

print("RMSE: {}".format(test_results.rootMeanSquaredError))
print("MSE: {}".format(test_results.meanSquaredError))
print("R2: {}".format(test_results.r2))
linearRegressionR2 = test_results.r2



### Analizam rezultatele si oprim contextul de spark


In [None]:
print("Script results: ")
print("Random forest classifier error", randomForestError)
print("Neural networks classifier error", deepLearningError)
print("Linear regression R2", linearRegressionR2)
spark.stop()