<a href="https://colab.research.google.com/github/rawar/ix-ml-pydeequ/blob/main/pipeline_notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PyDeequ ML-Pipeline

Innerhalb dieses Notebooks wird die Anwendung von PyDeequ für die Prüfung der Datenqualität innerhalb einer einfachen Machine Learning Pipeline gezeigt.

## Java, Hadoop, Spark und PyDeequ installieren

In [1]:
# Notebook-Umgebung auf den neusten Stand bringen
!apt-get -qq update

In [2]:
# Java installieren
!apt-get -qq install -y openjdk-8-jdk-headless
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
!java -version

Selecting previously unselected package openjdk-8-jre-headless:amd64.
(Reading database ... 155229 files and directories currently installed.)
Preparing to unpack .../openjdk-8-jre-headless_8u312-b07-0ubuntu1~18.04_amd64.deb ...
Unpacking openjdk-8-jre-headless:amd64 (8u312-b07-0ubuntu1~18.04) ...
Selecting previously unselected package openjdk-8-jdk-headless:amd64.
Preparing to unpack .../openjdk-8-jdk-headless_8u312-b07-0ubuntu1~18.04_amd64.deb ...
Unpacking openjdk-8-jdk-headless:amd64 (8u312-b07-0ubuntu1~18.04) ...
Setting up openjdk-8-jre-headless:amd64 (8u312-b07-0ubuntu1~18.04) ...
update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/orbd to provide /usr/bin/orbd (orbd) in auto mode
update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/servertool to provide /usr/bin/servertool (servertool) in auto mode
update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/tnameserv to provide /usr/bin/tnameserv (tnameserv) in auto mode
Setting up ope

In [7]:
# Spark und Hadoop herunterladen. Verschiedene Versionen finden sich unter https://archive.apache.org/dist/spark/spark-2.4.7/
!wget  https://archive.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz

--2022-01-30 14:51:19--  https://archive.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
Resolving archive.apache.org (archive.apache.org)... 138.201.131.134, 2a01:4f8:172:2ec5::2
Connecting to archive.apache.org (archive.apache.org)|138.201.131.134|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 227893062 (217M) [application/x-gzip]
Saving to: ‘spark-2.4.0-bin-hadoop2.7.tgz’


2022-01-30 14:51:29 (22.9 MB/s) - ‘spark-2.4.0-bin-hadoop2.7.tgz’ saved [227893062/227893062]



In [8]:
# Spark und Hadoop installieren
!tar xf spark-2.4.0-bin-hadoop2.7.tgz
!pip install -q findspark

In [9]:
# Umgebungsvariablen setzen
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"
os.environ["SPARK_VERSION"] = "2.4.0"

In [10]:
# Spark initialisieren
import findspark
findspark.init()

In [11]:
# PyDeequ installieren
!pip install pydeequ



## Beispieldatensatz laden
Innerhalb dieses Notebooks werden Daten von Amazon Video Game Reviews aus dem Jahre 2014 verwendet, welche für Testzwecke von der UCSD zur Verfügung gestellt werden und [hier]http://deepyeti.ucsd.edu/jianmo/amazon/) heruntergeladen werden können. 

In [12]:
# Herunterladen der Review Daten
!wget http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Video_Games.json.gz

--2022-01-30 14:51:53--  http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Video_Games.json.gz
Resolving deepyeti.ucsd.edu (deepyeti.ucsd.edu)... 169.228.63.50
Connecting to deepyeti.ucsd.edu (deepyeti.ucsd.edu)|169.228.63.50|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 522823613 (499M) [application/octet-stream]
Saving to: ‘Video_Games.json.gz’


2022-01-30 14:52:03 (48.7 MB/s) - ‘Video_Games.json.gz’ saved [522823613/522823613]



In [13]:
# Auspacken der Review Daten
!gunzip Video_Games.json.gz

In [14]:
import os
import json
import gzip
import pandas as pd
import pydeequ
from time import time
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit,CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml import Pipeline, PipelineModel

In [15]:
# Spark Session konfigurieren und initialisieren 
spark = SparkSession.builder.master("local").appName("Colab").config('spark.ui.port', '4050').config("spark.jars.packages", pydeequ.deequ_maven_coord).config("spark.jars.excludes", pydeequ.f2j_maven_coord).getOrCreate()
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '8g'), ('spark.driver.memory','8g')])

In [16]:
# Einlesen der Review Daten in Spark
ddf = spark.read.json("Video_Games.json")

In [17]:
# Anzahl der Datensätze zählen
ddf.count()

2565349

In [18]:
# Drei Beispieldatensätze anzeigen
ddf.show(n=3)

+----------+-----+-------+--------------------+-----------+--------------+---------------+-----+--------------------+--------------+--------+----+
|      asin|image|overall|          reviewText| reviewTime|    reviewerID|   reviewerName|style|             summary|unixReviewTime|verified|vote|
+----------+-----+-------+--------------------+-----------+--------------+---------------+-----+--------------------+--------------+--------+----+
|0439381673| null|    1.0|I used to play th...| 06 9, 2014|A21ROB4YDOZA5P|  Mary M. Clark| null|   Did not like this|    1402272000|    true|null|
|0439381673| null|    3.0|The game itself w...|05 10, 2014|A3TNZ2Q5E7HTHD|      Sarabatya| null|      Almost Perfect|    1399680000|    true|null|
|0439381673| null|    4.0|I had to learn th...| 02 7, 2014|A1OKRM3QFEATQO|Amazon Customer| null|DOES NOT WORK WIT...|    1391731200|    true|  15|
+----------+-----+-------+--------------------+-----------+--------------+---------------+-----+--------------------+-

## Rohdaten auswählen

In [19]:
# Unwichtige Spalten löschen
ddf = ddf.drop('helpful','reviewText','reviewTime','reviewerName','unixReviewTime','summary')

In [20]:
# Spalten umbenennen
ddf = ddf.selectExpr("asin","overall as rating", "reviewerID as user")

In [21]:
# Drei Beispieldatensätze anzeigen
ddf.show(n=3)

+----------+------+--------------+
|      asin|rating|          user|
+----------+------+--------------+
|0439381673|   1.0|A21ROB4YDOZA5P|
|0439381673|   3.0|A3TNZ2Q5E7HTHD|
|0439381673|   4.0|A1OKRM3QFEATQO|
+----------+------+--------------+
only showing top 3 rows



In [22]:
# Datensatz verkleinern
ddf = ddf.sample(0.01)

In [23]:
print(f"Datensatzgröße: {ddf.count()}")

Datensatzgröße: 25449


## Daten verifizieren

PyDeequ kann im ersten Schritt einige statistische Daten zu dem zu verarbeitenden Datensatz erstellen. Deequ kennt eine Reihe dieser statistischen Metriken (metrics). Einen guten Überblick findet sich [hier](https://aws.amazon.com/de/blogs/big-data/test-data-quality-at-scale-with-deequ/). Im Quellcode findet man die Metriken [hier](https://github.com/awslabs/deequ/tree/master/src/main/scala/com/amazon/deequ/analyzers). 

In [18]:
from pydeequ.analyzers import *
from pydeequ.profiles import *
from pydeequ.suggestions import *
from pydeequ.checks import *
from pydeequ.verification import *
from pydeequ.repository import *

In [19]:
analysisResult = AnalysisRunner(spark) \
  .onData(ddf) \
  .addAnalyzer(Size()) \
  .addAnalyzer(Completeness("user")) \
  .addAnalyzer(ApproxCountDistinct("user")) \
  .addAnalyzer(Mean("rating")) \
  .addAnalyzer(Compliance("top rating", "rating >= 4.0")) \
  .addAnalyzer(Correlation("total_rating", "top rating")) \
  .run()

In [20]:
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

+-------+----------+-------------------+------------------+
| entity|  instance|               name|             value|
+-------+----------+-------------------+------------------+
| Column|    rating|               Mean|4.0189240577989995|
| Column|      user|       Completeness|               1.0|
| Column|      user|ApproxCountDistinct|          250142.0|
|Dataset|         *|               Size|          256129.0|
| Column|top rating|         Compliance|0.7400294382908612|
+-------+----------+-------------------+------------------+



Aus dem Ergebnis lässt sich ablesen, dass wir 2.4 Mio Datensätze haben und das wir ein Durchschnittliches Rating von 4 Sternen haben. 

### Automatische Vorschläge für Contrains erzeugen

In [21]:
suggestionResult = ConstraintSuggestionRunner(spark) \
  .onData(ddf) \
  .addConstraintRule(DEFAULT()) \
  .run()

# Constraint Suggestions in JSON format
print(json.dumps(suggestionResult, indent=2))


{
  "constraint_suggestions": [
    {
      "constraint_name": "CompletenessConstraint(Completeness(rating,None))",
      "column_name": "rating",
      "current_value": "Completeness: 1.0",
      "description": "'rating' is not null",
      "suggesting_rule": "CompleteIfCompleteRule()",
      "rule_description": "If a column is complete in the sample, we suggest a NOT NULL constraint",
      "code_for_constraint": ".isComplete(\"rating\")"
    },
    {
      "constraint_name": "ComplianceConstraint(Compliance('rating' has no negative values,rating >= 0,None))",
      "column_name": "rating",
      "current_value": "Minimum: 1.0",
      "description": "'rating' has no negative values",
      "suggesting_rule": "NonNegativeNumbersRule()",
      "rule_description": "If we see only non-negative numbers in a column, we suggest a corresponding constraint",
      "code_for_constraint": ".isNonNegative(\"rating\")"
    },
    {
      "constraint_name": "CompletenessConstraint(Completeness(use

### Testen der Daten

In [22]:
check = Check(spark, CheckLevel.Warning, "Video Game Review Check")

In [23]:
checkResult = VerificationSuite(spark) \
  .onData(ddf) \
  .addCheck(
    check.hasSize(lambda x: x >= 2000000) \
      .hasMin("rating", lambda x: x == 1.0) \
      .hasMax("rating", lambda x: x == 5.0)  \
      .isComplete("user")  \
      .isUnique("user")  \
  ).run()

Python Callback server started!


In [24]:
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()

+--------------------+-----------+------------+--------------------+-----------------+--------------------+
|               check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
+--------------------+-----------+------------+--------------------+-----------------+--------------------+



## Datenaufbereiten für Collaboratives Filtering mit Alternating Least Squares (ALS)

[Quelle](https://github.com/craigmacartney/Spark-ALS-Recommendation/blob/master/AmazonRecommendationSystem.ipynb)

In [24]:
# Umwandlen der asin und user Spalte von einem String in einen StringIndexer 
asinIndexer = StringIndexer(inputCol="asin", outputCol="item",handleInvalid='error') 
userIndexer = StringIndexer(inputCol='user',outputCol='userid',handleInvalid='error') 
asinIndexed = asinIndexer.fit(ddf).transform(ddf) # 
userIndexed = userIndexer.fit(asinIndexed).transform(asinIndexed) 
ddf_indexed = userIndexed.drop('asin').drop('user') 

In [25]:
# 80:10 Aufteilung für Training- und Testdaten
(ddf_train, ddf_test) = ddf_indexed.randomSplit([0.8,0.2])
ddf_train.cache() 
ddf_test.cache()

DataFrame[rating: double, item: double, userid: double]

In [26]:
# Größe des Trainings- und Testdatensatzes
print(f"Train set size: {ddf_train.count()}")
print(f"Test set size: {ddf_test.count()}")

Train set size: 20405
Test set size: 5044


In [28]:
print('Matrixgröße, Prozentsatz der gefüllten Matrix und Anzahl der verschiedenen Nutzer und Items:')
ddf_train.createOrReplaceTempView('ddf_train')
spark.sql("""
      SELECT *, 100 * rating/matrix_size AS percentage
        FROM (
          SELECT userid, item, rating, userid * item AS matrix_size
            FROM(
              SELECT COUNT(*) AS rating, COUNT(DISTINCT(item)) AS item, COUNT(DISTINCT(userid)) AS userid
                FROM ddf_train
                )
            )
""").show()

Matrix size, percentage of matrix filled and number of distinct users and itmes:
+------+-----+------+-----------+--------------------+
|userid| item|rating|matrix_size|          percentage|
+------+-----+------+-----------+--------------------+
|184102|32420|204695| 5968586840|0.003429538775044446|
+------+-----+------+-----------+--------------------+



## Recommender Modell trainieren

In [27]:
# Modell erzeugen
als = ALS(userCol="userid", itemCol="item", ratingCol="rating",coldStartStrategy='drop',nonnegative=False)

In [28]:
# Evaluator erzeugen
rmseevaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [29]:
# Parameter Grid erzeugen
paramGrid = ParamGridBuilder() \
  .addGrid(als.rank, [1,5,10,50,70]) \
  .addGrid(als.maxIter,[15]) \
  .addGrid(als.regParam,[0.05,0.1,0.5,5]) \
  .build()

In [None]:
# Aufteilung in Trainings- und Validierungsdaten 
tvs = TrainValidationSplit(estimator=als,
  estimatorParamMaps=paramGrid,
  evaluator=rmseevaluator,
  trainRatio=0.8)

# Model trainieren und die Trainingszeit messen
startTime = time()
tvsmodel = tvs.fit(ddf_train)
endTime = time()
measured_execution_time = (endTime-startTime)
    
print(f"Gemessene Trainingszeit: {measured_execution_time}")

In [None]:
# Vorhersage und Auswertung mit Hilfe des Testdatensatzes
predictions = tvsmodel.transform(ddf_test)
testset_rmse = rmseevaluator.evaluate(predictions)
print(f"Test set RMSE: {testset_rmse}") 

## Recommender Modell anwenden

In [None]:
def recommendVideoGames(model, user, num_rec):
    itemsuser = ddf_train.select("item").distinct().withColumn("userid", lit(user))
    gamesrated = ddf_train.filter(ddf_train.userid == user).select("item", "userid")
    predictions = model.transform(itemsuser.subtract(gamesrated)).dropna().orderBy("prediction", ascending=False).limit(num_rec).select("item", "prediction")
    predictions.show()
    converter = IndexToString(inputCol="item", outputCol="originalCategory")
    converted = converter.transform(predictions)
    converted.show()

In [None]:
recommendGrecommendVideoGamesames(tvsmodel,696,3)