In [0]:
# PART 1: Spark RDD API

## Task 1: Downloads data file and makes it available to Spark

from pyspark.sql import SparkSession
import os
import requests

# Instanciates SparkSession
spark = SparkSession.builder.getOrCreate()

# Parameters
csv_url = "https://raw.githubusercontent.com/stedy/Machine-Learning-with-R-datasets/master/groceries.csv"
file_name = "groceries.csv"
dbfs_path = "dbfs:/tmp/input_data/"


def download_file(url, file_name, dbfs_path):
    '''
    Dowloads single file from url and push it to dbfs.

            Parameters:
                    url (str): url of the site where the file is hosted
                    file_name (srt): Name of the file
                    dbfs_path (str): Target path where the file will be saved

            Returns:
                    target_path (str): path to the saved file
    '''
    # gets file from url
    res = requests.get(url, allow_redirects=True)
    assert res.status_code == 200, "Failed to download file: {}".format(res.text)
    file_content = res.text
    # puts file into dbfs
    target_path = os.path.join(dbfs_path,file_name)
    dbutils.fs.put(target_path,file_content, overwrite=True)
    return (target_path)

def csv_to_df(dbfs_file):
    '''
    makes local file available to Spark as pyspark.sql.dataframe.DataFrame.

            Parameters:
                    dbfs_file (str): Path to source file

            Returns:
                    DataFrame (str): pyspark.sql.dataframe.DataFrame with csv contents
    '''
    out_df = spark.read.format('csv')\
       .option('header', 'false')\
       .option('inferSchema', 'true')\
       .load(dbfs_file)
    return out_df

# downloads csv file into dbfs
dbfs_file = download_file(csv_url, file_name, dbfs_path)

# loads file into dataframe
groceries_df = csv_to_df(dbfs_file)

In [0]:
# PART 1: Spark RDD API

## Task 2 - Part a: unique list of products

from pyspark.sql.types import StructType,StructField, StringType, IntegerType

def file_exists(path):
  try:
    dbutils.fs.ls(path)
    return True
  except Exception as e:
    if 'java.io.FileNotFoundException' in str(e):
      return False
    else:
      raise
      
def create_pair(item): 
    return (item, 1) 

# returns list of pairs with unique key (product name), and value set as 1
groceries_summary = groceries_df.rdd\
    .flatMap(list)\
    .map(create_pair)\
    .reduceByKey(lambda a,b: a+b)\
    .filter(lambda x: x[0] != None)\

unique_products = groceries_summary.keys()


# all output files are placed in the same dir
output_path = "dbfs:/tmp/out"
dbutils.fs.mkdirs(output_path)

# list of unique products
filename = os.path.join(output_path,"out_1_2a.txt")
if file_exists(filename) is not True:
  unique_products.coalesce(1).saveAsTextFile(filename)

# count of total items
filename = os.path.join(output_path,"out_1_2b.txt")
total_items = sum(groceries_summary.values().collect())

# creates spark dataframe with column name "count" and writes it to file
schema = StructType([StructField("count",IntegerType(),True)])  
data = [(total_items,)]
total_items_df = spark.createDataFrame(data=data, schema=schema)
total_items_df.select("count").coalesce(1)\
    .write\
    .mode ("overwrite")\
    .format("csv")\
    .option("header", "true")\
    .save(filename)

In [0]:
# PART 1: Spark RDD API

## Task 3: Top 5 products

filename = os.path.join(output_path,"out_1_3.txt")
top5groceries = groceries_summary.takeOrdered(5,lambda x: -x[1])

# creates spark dataframe with column name "count" and writes it to file
schema = StructType([StructField("product",StringType(),True),\
                     StructField("count",IntegerType(),True)])  
top5groceries_df = spark.createDataFrame(data=top5groceries, schema=schema)
top5groceries_df.coalesce(1)\
    .write\
    .mode ("overwrite")\
    .format("csv")\
    .option("header", "true")\
    .save(filename)

In [0]:
# PART 2: Spark Dataframe API

## Task 1: Downloads parquet file and make it available to Spark

from pyspark import SparkContext,SparkFiles
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, FloatType
import os

# instanciates spark session object
spark = SparkSession.builder.getOrCreate()

url = "https://github.com/databricks/LearningSparkV2/blob/master/mlflow-project-example/data/sf-airbnb-clean.parquet/"
file_path = "dbfs:/FileStore/tables/sf-airbnb-clean/"
output_path = "dbfs:/tmp/out"
dbutils.fs.mkdirs(output_path)

airbnb_df = spark.read.parquet(file_path)

In [0]:
# PART 2: Spark Dataframe API

## Task 2: Creates CSV that lists the minimum price, maximum price and total row count

summary = airbnb_df\
    .groupBy()\
    .min('price')\
    .collect()[0]\
    .__getitem__(0)\
,airbnb_df\
    .groupBy()\
    .max('price')\
    .collect()[0]\
    .__getitem__(0)\
,airbnb_df.count()

filename = os.path.join(output_path,"out_2_2.txt")
schema = StructType([StructField("min_price",FloatType(),True),\
                     StructField("max_price",FloatType(),True),\
                     StructField("total",IntegerType(),True)])  

summary_df = spark.createDataFrame(data=[summary], schema=schema)
summary_df.coalesce(1)\
    .write\
    .mode ("overwrite")\
    .format("csv")\
    .option("header", "true")\
    .save(filename)

In [0]:
# PART 2: Spark Dataframe API

## Task 3: Calculate the average number of bathrooms and bedrooms across all the properties listed in this data set with a price of > 5000 and a review score being exactly equalt to 10.

selected_df = airbnb_df\
    .filter((airbnb_df.price>5000 ) & (airbnb_df.review_scores_value==10))

avg_summary = selected_df\
    .groupBy()\
    .avg('bathrooms')\
    .collect()[0]\
    .__getitem__(0)\
,airbnb_df\
    .groupBy()\
    .avg('bedrooms')\
    .collect()[0]\
    .__getitem__(0)


filename = os.path.join(output_path,"out_2_3.txt")
schema = StructType([StructField("bathrooms",FloatType(),True),\
                     StructField("bedrooms",FloatType(),True)])  
avg_summary_df = spark.createDataFrame(data=[avg_summary], schema=schema)
avg_summary_df.coalesce(1)\
    .write\
    .mode ("overwrite")\
    .format("csv")\
    .option("header", "true")\
    .save(filename)

In [0]:
# PART 2: Spark Dataframe API

## Task 4: How many people can be accomodated by the property with the lowest price and highest rating?

naccomodates_bestdeal = airbnb_df\
    .orderBy(airbnb_df.price.asc(),airbnb_df.review_scores_rating.desc())\
    .select('accommodates')\
    .take(1)[0]\
    .__getitem__(0)

filename = os.path.join(output_path,"out_2_4.txt")
schema = StructType([StructField("n_people",FloatType(),True)])
data = [(naccomodates_bestdeal,)]
naccomodates_bestdeal_df = spark.createDataFrame(data=data, schema=schema)
naccomodates_bestdeal_df.coalesce(1)\
    .write\
    .mode ("overwrite")\
    .format("csv")\
    .option("header", "true")\
    .save(filename)

In [0]:
%sh pip install "apache-airflow[databricks]"

In [0]:
# PART 2: Spark Dataframe API

## Task 5: Apache Airflow

import airflow
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime, timedelta

# The next section sets some default arguments applied to each task in the DAG
args = {
    'owner': 'airflow',
    'email': ['airflow@example.com'],
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(0)
}

#The DAG instantiation statement gives the DAG a unique ID, attaches the default arguments, and gives it a daily schedule (example).
dag = DAG(dag_id='Task_2_5_DAG', default_args=args, schedule_interval='@daily')

task1 = DummyOperator(task_id='Task_1', dag=dag)
task2 = DummyOperator(task_id='Task_2', dag=dag)
task3 = DummyOperator(task_id='Task_3', dag=dag)
task4 = DummyOperator(task_id='Task_4', dag=dag)
task5 = DummyOperator(task_id='Task_5', dag=dag)
task6 = DummyOperator(task_id='Task_6', dag=dag)

task1 >> [task2, task3]
task2 >> [task4, task5, task6]
task3 >> [task4, task5, task6]

In [0]:
 %sh curl -L "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data" -o "/tmp/iris.csv"

In [0]:
# PART 3: Applied ML

## Task 1 sklearn

import numpy as np
import pandas as pd
from sklearn.linear_model import LogisticRegression

local_file = "file:/tmp/iris.csv"
df = pd.read_csv(local_file,\
    names = ["sepal_length", "sepal_width", "petal_length", "petal_width", "class"])

# Separate features from class.
array = df.values
X = array[:,0:4]
y = array[:,4]

# Fit Logistic Regression classifier.
logreg = LogisticRegression(C=1e5)
logreg.fit(X, y)

# Predict on training data. Seems to work.
# 5.1     3.5     1.4     0.2     Iris-setosa
# 6.2     3.4     5.4     2.3     Iris-virginica
print(logreg.predict([[5.1, 3.5, 1.4, 0.2]]))
print(logreg.predict([[6.2, 3.4, 5.4, 2.3]]))

In [0]:
# PART 3: Applied ML

## Task 2: MLlib

from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer, OneHotEncoder, IndexToString
from pyspark.sql import SparkSession
import os


# Instanciates SparkSession
spark = SparkSession.builder.getOrCreate()

# all output files are placed in the same dir
output_path = "dbfs:/tmp/out"
dbutils.fs.mkdirs(output_path)

# uploads downloaded file into dbfs
local_file = "file:/tmp/iris.csv"
dbfs_file = "dbfs:/tmp/input_data/iris.csv"
dbutils.fs.cp(local_file,dbfs_file)

col_names = ["sepal_length", "sepal_width", "petal_length", "petal_width", "class"]

schema = """`sepal_length` DOUBLE,
            `sepal_width` DOUBLE,
            `petal_length` DOUBLE,
            `petal_width` DOUBLE,
            `class` STRING
        """

df = spark.read.csv(dbfs_file,schema=schema)
    
categoricalCols = ["class"]

# The following two lines are estimators. They return functions that we will later apply to transform the dataset.

# Convert it to a numeric value using StringIndexer.
labelToIndex = StringIndexer(inputCol="class", outputCol="indexed_class")
labelIndexer = labelToIndex.fit(df)
labelReverser = IndexToString(inputCol="prediction", outputCol="class", labels=labelIndexer.labels)

# This includes both the numeric columns and the one-hot encoded binary vector columns in our dataset.
numericCols = ["sepal_length", "sepal_width", "petal_length", "petal_width"]

vecAssembler = VectorAssembler(inputCols=numericCols, outputCol="features")

lr = LogisticRegression(featuresCol="features", labelCol="indexed_class", regParam=1e5)

# Define the pipeline based on the stages created in previous steps.
pipeline = Pipeline(stages=[labelToIndex, vecAssembler, lr, labelReverser])

# Define the pipeline model.
pipelineModel = pipeline.fit(df)

test_df = spark.createDataFrame([
    (5.1, 3.5, 1.4, 0.2),
    (6.2, 3.4, 5.4, 2.3)
], ["sepal_length", "sepal_width", "petal_length", "petal_width"])

# Apply the pipeline model to the test dataset.
pred_df = pipelineModel.transform(test_df)

filename = os.path.join(output_path,"out_3_2.txt")
pred_df.select("class").coalesce(1)\
    .write\
    .mode ("overwrite")\
    .format("csv")\
    .option("header", "true")\
    .save(filename)
        
pred_df.select("class").show()

In [0]:
file_name = "out_3_2.txt"
output_path = "dbfs:/tmp/out"
output_file = os.path.join(output_path,file_name)
output = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header","true")\
  .csv(output_file)
display(output)