In [None]:
import pyspark

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession \
    .builder \
    .appName("Spark Task") \
    .getOrCreate()

### Part 1: Spark RDD API

#### Task 1

In [None]:
#link = "https://raw.githubusercontent.com/stedy/Machine-Learning-with-R-datasets/master/groceries.csv"

In [None]:
with open('groceries.csv', 'r') as csvfile:
    csvtext = csvfile.readlines()

In [None]:
csv = []
for i in csvtext:
    line = i.replace("\n", "")
    add = line.split(',')
    csv.extend(add)

In [None]:
type(rdd)

In [None]:
rdd=spark.sparkContext.parallelize(csv)

In [None]:
rdd.collect()

#### Task 2

a)

In [None]:
rddUnique = rdd.distinct()

In [None]:
rddUnique.collect()

In [None]:
rddUnique.coalesce(1).saveAsTextFile("test4_out/out_1_2a.txt")

b)

In [None]:
rddUnique.count()

In [None]:
rddUniqueCount = rddUnique.count()

In [None]:
rddUnique=spark.sparkContext.parallelize([rddUniqueCount])

In [None]:
rddUnique.saveAsTextFile("test_out/out_1_2b.txt")

#### Task 3

In [None]:
values = rdd.countByValue().items()

In [None]:
rddValues=spark.sparkContext.parallelize(values)

In [None]:
rddValues.collect()

In [None]:
rddValues.saveAsTextFile("test_out/out_1_3.txt")

### Part 2: Spark Dataframe API

#### Task 1

In [None]:
file_path = "files/part-00000.parquet"

In [None]:
df = spark.read.parquet(file_path)

In [None]:
df.columns

In [None]:
type(df)

In [None]:
type(spark)

In [None]:
df.show(vertical=True)

#### Task 2

In [None]:
import pyspark.sql.functions as f

In [None]:
df.select([f.max("price")]).show()

In [None]:
df.select([f.min("price")]).show()

In [None]:
df.count()

In [None]:
dfStats = df.select([f.min("price"), f.max("price"), f.count("price")])

In [None]:
dfStats.show()

In [None]:
dfStats.coalesce(1).write.csv("test3_out/out_2_2.txt")

#### Task 3

In [None]:
group_cols = ["bedrooms", "beds"]

In [None]:
df_filtered = df.filter(df.price > 5000).filter(df.review_scores_value == 10)

In [None]:
df_filtered.show(vertical=True)

In [None]:
df_selected = df_filtered.select(f.mean('bathrooms'), f.mean('bedrooms'))

In [None]:
df_selected.show()

In [None]:
df_selected = df_selected.withColumnRenamed("avg(bathrooms)", "avg_bathrooms").withColumnRenamed("avg(bedrooms)", "avg_bedrooms")

In [None]:
df_selected.show()

In [None]:
df_selected.write.csv("test_out/ut_2_3.csv")

#### Task 4

In [None]:
minPrice = df.select("price").rdd.min()[0]

In [None]:
dfPeople = df.filter(df.price == minPrice).select("review_scores_value", 'beds')

In [None]:
dfPeople.show()

In [None]:
maxRating = df.select("review_scores_value").rdd.max()[0]

In [None]:
dfPeople = dfPeople.filter(dfPeople.review_scores_value == maxRating).select("review_scores_value", 'beds')

In [None]:
dfPeople.show()

In [None]:
dfPeople = df.filter(df.price == minPrice).select("review_scores_value", 'beds').show()

In [None]:
dfPeople = df.filter(df.price == minPrice).sort("review_scores_value")#.select('beds')

In [None]:
df.filter(df.price == minPrice)

#### Task 5

In [None]:
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.dummy_operator import DummyOperator

with DAG(
    "etl_sales_daily",
    start_date=days_ago(1),
    schedule_interval=None,
) as dag:
    
    task1 = DummyOperator(task_id="task1")
    task2 = DummyOperator(task_id="task2")
    task3 = DummyOperator(task_id="task3")
    task4 = DummyOperator(task_id="task4")
    task5 = DummyOperator(task_id="task5")
    task6 = DummyOperator(task_id="task6")
    
    task1 >> [task2, task3]
    [task2, task3] >> [task4, task5, task6]

### Part 3: Applied Machine Learning

#### Task 1

In [1]:
from pyspark.sql import SparkSession

In [2]:
import pyspark.sql.functions as f

In [3]:
spark = SparkSession \
    .builder \
    .appName("Spark Task") \
    .getOrCreate()

#### Task 2

In [4]:
from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler

In [5]:
dfIris = spark.read.csv('iris.csv')

In [6]:
dfIris.show()

+---+---+---+---+-----------+
|_c0|_c1|_c2|_c3|        _c4|
+---+---+---+---+-----------+
|5.1|3.5|1.4|0.2|Iris-setosa|
|4.9|3.0|1.4|0.2|Iris-setosa|
|4.7|3.2|1.3|0.2|Iris-setosa|
|4.6|3.1|1.5|0.2|Iris-setosa|
|5.0|3.6|1.4|0.2|Iris-setosa|
|5.4|3.9|1.7|0.4|Iris-setosa|
|4.6|3.4|1.4|0.3|Iris-setosa|
|5.0|3.4|1.5|0.2|Iris-setosa|
|4.4|2.9|1.4|0.2|Iris-setosa|
|4.9|3.1|1.5|0.1|Iris-setosa|
|5.4|3.7|1.5|0.2|Iris-setosa|
|4.8|3.4|1.6|0.2|Iris-setosa|
|4.8|3.0|1.4|0.1|Iris-setosa|
|4.3|3.0|1.1|0.1|Iris-setosa|
|5.8|4.0|1.2|0.2|Iris-setosa|
|5.7|4.4|1.5|0.4|Iris-setosa|
|5.4|3.9|1.3|0.4|Iris-setosa|
|5.1|3.5|1.4|0.3|Iris-setosa|
|5.7|3.8|1.7|0.3|Iris-setosa|
|5.1|3.8|1.5|0.3|Iris-setosa|
+---+---+---+---+-----------+
only showing top 20 rows



In [15]:
dfIris.select('_c4').distinct().show()

+---------------+
|            _c4|
+---------------+
| Iris-virginica|
|    Iris-setosa|
|Iris-versicolor|
+---------------+



In [16]:
mapping = {'Iris-virginica' : "1", "Iris-setosa" : "2", "Iris-versicolor" : "3"}

In [17]:
dfIrisLabeled = dfIris.withColumnRenamed('_c4', 'label')

In [18]:
dfIrisLabeled = dfIrisLabeled.replace(to_replace=mapping, subset=['label'])

In [19]:
dfIrisLabeled = dfIrisLabeled.withColumn("label", dfIrisLabeled.label.cast('int'))

In [39]:
dfIrisInt = (dfIrisLabeled.withColumn("_c0", dfIrisLabeled._c0.cast('float'))
                 .withColumn("_c1", dfIrisLabeled._c1.cast('float'))
                 .withColumn("_c2", dfIrisLabeled._c2.cast('float'))
                 .withColumn("_c3", dfIrisLabeled._c3.cast('float')))

In [40]:
dfIrisInt.show()

+---+---+---+---+-----+
|_c0|_c1|_c2|_c3|label|
+---+---+---+---+-----+
|5.1|3.5|1.4|0.2|    2|
|4.9|3.0|1.4|0.2|    2|
|4.7|3.2|1.3|0.2|    2|
|4.6|3.1|1.5|0.2|    2|
|5.0|3.6|1.4|0.2|    2|
|5.4|3.9|1.7|0.4|    2|
|4.6|3.4|1.4|0.3|    2|
|5.0|3.4|1.5|0.2|    2|
|4.4|2.9|1.4|0.2|    2|
|4.9|3.1|1.5|0.1|    2|
|5.4|3.7|1.5|0.2|    2|
|4.8|3.4|1.6|0.2|    2|
|4.8|3.0|1.4|0.1|    2|
|4.3|3.0|1.1|0.1|    2|
|5.8|4.0|1.2|0.2|    2|
|5.7|4.4|1.5|0.4|    2|
|5.4|3.9|1.3|0.4|    2|
|5.1|3.5|1.4|0.3|    2|
|5.7|3.8|1.7|0.3|    2|
|5.1|3.8|1.5|0.3|    2|
+---+---+---+---+-----+
only showing top 20 rows



In [41]:
assembler = VectorAssembler(inputCols = ['_c0', '_c1', '_c2', '_c3'], outputCol='features')
output = assembler.transform(dfIrisInt)

In [42]:
finalised_data = output.select('features', 'label')

In [43]:
finalised_data.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[5.09999990463256...|    2|
|[4.90000009536743...|    2|
|[4.69999980926513...|    2|
|[4.59999990463256...|    2|
|[5.0,3.5999999046...|    2|
|[5.40000009536743...|    2|
|[4.59999990463256...|    2|
|[5.0,3.4000000953...|    2|
|[4.40000009536743...|    2|
|[4.90000009536743...|    2|
|[5.40000009536743...|    2|
|[4.80000019073486...|    2|
|[4.80000019073486...|    2|
|[4.30000019073486...|    2|
|[5.80000019073486...|    2|
|[5.69999980926513...|    2|
|[5.40000009536743...|    2|
|[5.09999990463256...|    2|
|[5.69999980926513...|    2|
|[5.09999990463256...|    2|
+--------------------+-----+
only showing top 20 rows



In [44]:
train, test = finalised_data.randomSplit([0.7, 0.3])

In [45]:
lr = LogisticRegression(maxIter=10, tol=1E-6, fitIntercept=True, labelCol='label', featuresCol='features')

In [46]:
fit_model = lr.fit(train)

In [47]:
lrn_summary = fit_model.summary
lrn_summary.predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[4.30000019073486...|  2.0|[-2.7665473569707...|[4.83533237531611...|       2.0|
|[4.40000009536743...|  2.0|[-2.8575961317871...|[6.02016740563160...|       2.0|
|[4.40000009536743...|  2.0|[-2.8052348372137...|[2.54823043397787...|       2.0|
|[4.40000009536743...|  2.0|[-2.7165255879470...|[8.33176041133090...|       2.0|
|[4.5,2.2999999523...|  2.0|[-3.1383912021474...|[3.65114949494463...|       2.0|
|[4.59999990463256...|  2.0|[-2.7895770595776...|[6.13743723739274...|       2.0|
|[4.59999990463256...|  2.0|[-2.6648387811164...|[1.61453429499633...|       2.0|
|[4.59999990463256...|  2.0|[-2.5277703847562...|[8.33864900506284...|       2.0|
|[4.69999980926513...|  2.0|[-2.7355506106862...|[2.92155865107416...|       2.0|
|[4.699999809265

In [48]:
lrn_summary.predictions.describe().show()

+-------+------------------+------------------+
|summary|             label|        prediction|
+-------+------------------+------------------+
|  count|               109|               109|
|   mean| 1.963302752293578| 1.963302752293578|
| stddev|0.8042319260174194|0.8042319260174194|
|    min|               1.0|               1.0|
|    max|               3.0|               3.0|
+-------+------------------+------------------+



In [None]:
assembler = VectorAssembler(inputCols = ['_c0', '_c1', '_c2', '_c3'], outputCol='features')
output = assembler.transform(dfIrisInt)

In [49]:
pred_data = spark.createDataFrame(
    [(5.1, 3.5, 1.4, 0.2),
     (6.2, 3.4, 5.4, 2.3)],
    ["sepal_length", "sepal_width", "petal_length", "petal_width"])

In [50]:
pred_data.show()

+------------+-----------+------------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|
+------------+-----------+------------+-----------+
|         5.1|        3.5|         1.4|        0.2|
|         6.2|        3.4|         5.4|        2.3|
+------------+-----------+------------+-----------+



In [53]:
assembler = VectorAssembler(inputCols = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width'], outputCol='features')
pred_data_acc = assembler.transform(pred_data)

In [60]:
pred_data_acc_ = pred_data_acc.select('features')

In [66]:
fit_model.transform(pred_data_acc_).show()

+-----------------+--------------------+--------------------+----------+
|         features|       rawPrediction|         probability|prediction|
+-----------------+--------------------+--------------------+----------+
|[5.1,3.5,1.4,0.2]|[-2.6358602471710...|[3.93083743216410...|       2.0|
|[6.2,3.4,5.4,2.3]|[-3.4132219437368...|[1.16112013199064...|       1.0|
+-----------------+--------------------+--------------------+----------+



In [67]:
def predictModel(spark, model):
    """Predict given results model
    
    Args:
        spark: pyspark.sql.session.SparkSession
        model: LinearRegression
    Return:
        df: dataframe with predictions, pyspark.sql.dataframe.DataFrame
    """
    predData = spark.createDataFrame(
        [(5.1, 3.5, 1.4, 0.2),
         (6.2, 3.4, 5.4, 2.3)],
        ["sepal_length", "sepal_width", "petal_length", "petal_width"])
    assembler = VectorAssembler(inputCols = ['sepal_length',
                                             'sepal_width',
                                             'petal_length',
                                             'petal_width'],
                                outputCol='features')
    predDataAcc = assembler.transform(predData)
    predFeatures = predDataAcc.select('features')
    predicts = model.transform(predFeatures)
    return predicts

In [80]:
dfff = predictModel(spark, fit_model)

In [81]:
dfff.show()

+-----------------+--------------------+--------------------+----------+
|         features|       rawPrediction|         probability|prediction|
+-----------------+--------------------+--------------------+----------+
|[5.1,3.5,1.4,0.2]|[-2.6358602471710...|[3.93083743216410...|       2.0|
|[6.2,3.4,5.4,2.3]|[-3.4132219437368...|[1.16112013199064...|       1.0|
+-----------------+--------------------+--------------------+----------+



In [86]:
from pyspark.sql.types import StringType

In [89]:
dffff_ = (dfff.withColumn("features", f.col('features').cast(StringType()))
          .withColumn("rawPrediction", f.col('rawPrediction').cast(StringType()))
          .withColumn("probability", f.col('probability').cast(StringType()))
          .withColumn("prediction", f.col('prediction').cast(StringType()))).show()

+-----------------+--------------------+--------------------+----------+
|         features|       rawPrediction|         probability|prediction|
+-----------------+--------------------+--------------------+----------+
|[5.1,3.5,1.4,0.2]|[-2.6358602471710...|[3.93083743216410...|       2.0|
|[6.2,3.4,5.4,2.3]|[-3.4132219437368...|[1.16112013199064...|       1.0|
+-----------------+--------------------+--------------------+----------+



In [73]:
dfff.write.csv('ttt.txt')

AnalysisException: CSV data source does not support struct<type:tinyint,size:int,indices:array<int>,values:array<double>> data type.

In [61]:
fit_model.predict(pred_data_acc_)

Py4JJavaError: An error occurred while calling o341.predict.
: java.lang.ClassCastException: class org.apache.spark.sql.Dataset cannot be cast to class org.apache.spark.ml.linalg.Vector (org.apache.spark.sql.Dataset and org.apache.spark.ml.linalg.Vector are in unnamed module of loader 'app')
	at org.apache.spark.ml.classification.LogisticRegressionModel.predict(LogisticRegression.scala:1055)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)


In [None]:
pred_labels.predictions.show()
eval = BinaryClassificationEvaluator(rawPredictionCol = "prediction", labelCol = "churn")
auc = eval.evaluate(pred_labels.predictions)
print(auc)