## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
import pyspark
from pyspark import SparkFiles
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import to_date
from pyspark.sql.functions import randn #For generating dataframe 
from pyspark.sql.functions import mean as _mean, stddev as _stddev, col

spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
import pandas as pd
import numpy as np

<h1><b>Problem 1</b></h1>
<p>1. Load “linearRegressionData.csv” to Databricks environment.</p>
<p>2. Create a dataframe with 500k simulated data records from “linearRegressionData.csv”</p>
<p>3. Load “autoMPGDataModified.csv” to Databricks environment</p>
<p>4. Create a dataframe with 500k simulated data records from “autoMPGDataModified.csv”</p>

In [0]:
# File location and type
file_location1="dbfs:/FileStore/shared_uploads/h20210811@pilani.bits-pilani.ac.in/autoMPGDataModified___Sheet1-2.csv"
file_location2 ="dbfs:/FileStore/shared_uploads/h20210811@pilani.bits-pilani.ac.in/linearRegressionData_csv___Sheet1-1.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

autompg = spark.read.format(file_type).option("inferSchema", infer_schema) .option("header", first_row_is_header).option("sep", delimiter) .load(file_location1)
linerregression = spark.read.format(file_type).option("inferSchema", infer_schema).option("header", first_row_is_header).option("sep", delimiter).load(file_location2)

display(autompg)
display(linerregression)

mpg,displacement,horsepower,weight,accelaration
18,307,18,3504,12.0
15,350,36,3693,11.5
18,318,30,3436,11.0
16,304,30,3433,12.0
17,302,25,3449,10.5


dvs1,ivs1,ivs2,ivs3
34.63,5.53,5.58,5.41
40.89,3.89,6.48,6.97
37.25,5.07,4.5,6.5
45.09,5.81,5.71,8.59
39.4,5.61,5.79,6.77


In [0]:
# Checking Schema of thre DataFrames
autompg.printSchema()
linerregression.printSchema()

root
 |-- mpg: integer (nullable = true)
 |-- displacement: integer (nullable = true)
 |-- horsepower: integer (nullable = true)
 |-- weight: integer (nullable = true)
 |-- accelaration: double (nullable = true)

root
 |-- dvs1: double (nullable = true)
 |-- ivs1: double (nullable = true)
 |-- ivs2: double (nullable = true)
 |-- ivs3: double (nullable = true)



In [0]:
from pyspark.sql.functions import mean as _mean, stddev as _stddev, col

df_stats = autompg.select(_mean(col('mpg')).alias('mean'),_stddev(col('columnName')).alias('std')
).collect()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-1559334314662778>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;32mfrom[0m [0mpyspark[0m[0;34m.[0m[0msql[0m[0;34m.[0m[0mfunctions[0m [0;32mimport[0m [0mmean[0m [0;32mas[0m [0m_mean[0m[0;34m,[0m [0mstddev[0m [0;32mas[0m [0m_stddev[0m[0;34m,[0m [0mcol[0m[0;34m[0m[0;34m[0m[0m
[1;32m      2[0m [0;34m[0m[0m
[0;32m----> 3[0;31m df_stats = autompg.select(_mean(col('mpg')).alias('mean'),_stddev(col('columnName')).alias('std')
[0m[1;32m      4[0m ).collect()

[0;32m/databricks/spark/python/pyspark/sql/dataframe.py[0m in [0;36mselect[0;34m(self, *cols)[0m
[1;32m   1824[0m         [0;34m[[0m[0mRow[0m[0;34m([0m[0mname[0m[0;34m=[0m[0;34m'Alice'[0m[0;34m,[0m [0mage[0m[0;34m=[0m[0;36m12[0m[0;34m)[0m[0;34m,[0m [0mRow[0m[0;34m

In [0]:
# Coverting RDD to dataframe
autompg=autompg.toPandas()
linerregression=linerregression.toPandas()

In [0]:
import numpy as np

myList = autompg.collect()
total = []
for product,nb in myList:
    for p2,score in nb:
            total.append(score)
mean = np.mean(total)
std = np.std(total)

[0;31m---------------------------------------------------------------------------[0m
[0;31mValueError[0m                                Traceback (most recent call last)
[0;32m<command-1559334314662775>[0m in [0;36m<module>[0;34m[0m
[1;32m      3[0m [0mmyList[0m [0;34m=[0m [0mautompg[0m[0;34m.[0m[0mcollect[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      4[0m [0mtotal[0m [0;34m=[0m [0;34m[[0m[0;34m][0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 5[0;31m [0;32mfor[0m [0mproduct[0m[0;34m,[0m[0mnb[0m [0;32min[0m [0mmyList[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      6[0m     [0;32mfor[0m [0mp2[0m[0;34m,[0m[0mscore[0m [0;32min[0m [0mnb[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m      7[0m             [0mtotal[0m[0;34m.[0m[0mappend[0m[0;34m([0m[0mscore[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m

[0;31mValueError[0m: too many values to unpack (expected 2)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-1559334314662776>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;32mfrom[0m [0mpyspark[0m[0;34m.[0m[0msql[0m[0;34m.[0m[0mfunctions[0m [0;32mimport[0m [0mmean[0m [0;32mas[0m [0m_mean[0m[0;34m,[0m [0mstddev[0m [0;32mas[0m [0m_stddev[0m[0;34m,[0m [0mcol[0m[0;34m[0m[0;34m[0m[0m
[1;32m      2[0m [0;34m[0m[0m
[0;32m----> 3[0;31m df_stats = autompg.select(
[0m[1;32m      4[0m     [0m_mean[0m[0;34m([0m[0mcol[0m[0;34m([0m[0;34m'mpg'[0m[0;34m)[0m[0;34m)[0m[0;34m.[0m[0malias[0m[0;34m([0m[0;34m'mean'[0m[0;34m)[0m[0;34m,[0m[0;34m[0m[0;34m[0m[0m
[1;32m      5[0m     [0m_stddev[0m[0;34m([0m[0mcol[0m[0;34m([0m[0;34m'columnName'[0m[0;34m)[0m[0;34m)[0m[0;34m.[0m[0malias[0m[0;34m([0m[0;34m'std'[0m[0;34m)[0m

<b>Simulating the values for each column  based on given dataset using random function

In [0]:
def Simulate():
    

In [0]:

N = 500000   

# For autompg.csv
mpgmean = autompg.agg({'mpg': 'mean'})
mpgstd = autompg['mpg'].std()
random_mpgmean = np.random.normal(loc=mpgmean, scale=mpgstd, size=N)

displacementmean = autompg.agg({'displacement': 'mean'})
displacementstd = autompg['displacement'].std()
random_displacement = np.random.normal(loc=mpgmean, scale=mpgstd, size=N)

horsepowermean = autompg.agg({'horsepower': 'mean'})
horsepowerstd = autompg['horsepower'].std()
random_horsepower = np.random.normal(loc=mpgmean, scale=mpgstd, size=N)
 
weightmean = autompg.agg({'weight': 'mean'})
weightstd = autompg['weight'].std()
random_weight = np.random.normal(loc=mpgmean, scale=mpgstd, size=N)
 
accelarationmean = autompg.agg({'accelaration': 'mean'})
accelarationstd = autompg['accelaration'].std()
random_acceleration = np.random.normal(loc=accelarationmean, scale=accelarationstd, size=N)

# For linearregression.csv
dvs1mean = linerregression.agg({'dvs1': 'mean'})
dvs1std = linerregression['dvs1'].std()
random_dvs1mean = np.random.normal(loc=dvs1mean, scale=dvs1std, size=N)

ivs1mean = linerregression.agg({'ivs1': 'mean'})
ivs1std = linerregression['ivs1'].std()
random_ivs1mean = np.random.normal(loc=ivs1mean, scale=ivs1std, size=N)

ivs2mean = linerregression.agg({'ivs2': 'mean'})
ivs2std = linerregression['ivs2'].std()
random_ivs2mean = np.random.normal(loc=ivs2mean, scale=ivs2std, size=N)
 
ivs3mean = linerregression.agg({'ivs3': 'mean'})
ivs3std = linerregression['ivs3'].std()
random_ivs3mean = np.random.normal(loc=ivs3mean, scale=ivs3std, size=N)

<b> Creating a DataFrame

In [0]:
# For autompg
autompg_f = pd.DataFrame()
autompg_f['mpg'] = random_mpgmean.tolist()
autompg_f['displacement'] = random_displacement.tolist()
autompg_f['horsepower'] = random_horsepower.tolist()
autompg_f['weight'] = random_weight.tolist()
autompg_f['accelaration'] = random_acceleration.tolist()

# For linearregression
linerregression = pd.DataFrame()
linerregression['dvs1'] = random_dvs1mean.tolist()
linerregression['ivs1'] = random_ivs1mean.tolist()
linerregression['ivs2'] = random_ivs2mean.tolist()
linerregression['ivs3'] = random_ivs3mean.tolist()

In [0]:
linerregression

Unnamed: 0,dvs1,ivs1,ivs2,ivs3
0,36.085466,5.619643,5.097119,7.480739
1,38.288057,5.407446,5.454967,7.324666
2,38.625439,4.465941,8.139343,5.369966
3,43.354791,5.135403,4.627853,8.705401
4,42.266959,5.213980,6.019043,7.564682
...,...,...,...,...
499995,36.709499,5.421075,4.630619,6.315032
499996,26.962126,4.585389,5.642680,4.349987
499997,36.258530,5.552004,4.059753,6.895400
499998,40.331565,4.675221,6.465247,6.455254


In [0]:
autompg_f

Unnamed: 0,mpg,displacement,horsepower,weight,accelaration
0,17.530923,15.225269,16.047371,15.293425,11.686335
1,17.321184,18.252078,16.561634,17.130608,11.332735
2,16.173547,16.952745,18.676730,16.053653,11.763519
3,17.347765,18.168999,15.918808,17.404584,10.788195
4,16.917794,14.578168,15.926359,16.147276,11.891575
...,...,...,...,...,...
499995,16.318475,19.156410,16.785157,17.041991,12.111606
499996,17.200961,16.033334,17.911956,17.212961,11.209670
499997,15.256700,16.516935,15.032488,17.218206,11.445747
499998,16.833749,16.583817,15.961892,17.542889,12.819243


<h1><b>Problem 2</b></h1>
<p>1. For data “linearRegressionData.csv”, transform Dataframe  to RDD.</p>
<p>2. Create an RDD of the labeled point.</p>
<p>3. Divide the data into a training and testing set.</p>
<p>4. Create a linear regression model.</p>
<p>5. Train and save the model.</p>
<p>6. Predict data using the saved model.</p>
<p>7. Evaluate the created model and check its accuracy.</p>

In [0]:
from pyspark.mllib.regression import LabeledPoint
import pyspark.mllib
import pyspark.mllib.regression
from pyspark.sql.functions import *
import numpy as np
from pyspark.mllib.linalg import Vectors

# Created RDD FILE
autompg_f = spark.createDataFrame(autompg_f)
autompg_frdd=autompg_f.rdd

# Labeled Point
labeledpoint=Vectors.dense(autompg_frdd.values().collect())

[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
[0;32m<command-3137207080508732>[0m in [0;36m<module>[0;34m[0m
[1;32m     11[0m [0;34m[0m[0m
[1;32m     12[0m [0;31m# Labeled Point[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 13[0;31m [0mlabeledpoint[0m[0;34m=[0m[0mVectors[0m[0;34m.[0m[0mdense[0m[0;34m([0m[0mautompg_frdd[0m[0;34m.[0m[0mvalues[0m[0;34m([0m[0;34m)[0m[0;34m.[0m[0mcollect[0m[0;34m([0m[0;34m)[0m[0;34m)[0m[0;34m.[0m[0mrdd[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/mllib/linalg/__init__.py[0m in [0;36m__getattr__[0;34m(self, item)[0m
[1;32m    483[0m [0;34m[0m[0m
[1;32m    484[0m     [0;32mdef[0m [0m__getattr__[0m[0;34m([0m[0mself[0m[0;34m,[0m [0mitem[0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m--> 485[0;31m         [0;32mret

In [0]:
autompg_frdd.take(5)



In [0]:
from pyspark.mllib.regression import LabeledPoint
import numpy as np

from pyspark.sql import functions as sql_functions




In [0]:
first_point_features = parsed_points_df.first().features
first_point_label = parsed_points_df.first().label



In [0]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["Row", "displacement", "horsepower",],outputCol="features")
transformed = assembler.transform(parsedData)



In [0]:
def from_data_frame(df, categorical=False, nb_classes=None):
    '''
    Convert DataFrame back to pair of numpy arrays
    '''
    lp_rdd = df.rdd.map(lambda row: LabeledPoint(row.label, row.features))
    features, labels = from_labeled_point(lp_rdd, categorical, nb_classes)
    return features, labels

a=from_data_frame(autompg_f)



In [0]:
def parse(l):
    l = [float(x) for x in l]
    return LabeledPoint(l[0],l[1:])

a=autompg_f.map(lambda l: parse(l[0]))



In [0]:
dfWithoutSchema = spark.createDataFrame(a)



In [0]:
testData = autompg_f.map(LabeledPoint.parse)



In [0]:
from pyspark.sql import functions as sql_functions

def parsed_points(df):
    """Converts a DataFrame of comma separated unicode strings into a DataFrame of `LabeledPoints`.

    Args:
        df: DataFrame where each row is a comma separated unicode string. The first element in the string
            is the label and the remaining elements are the features.

    Returns:
        DataFrame: Each row is converted into a `LabeledPoint`, which consists of a label and
            features. To convert an RDD to a DataFrame, simply call toDF().
    """
    #<FILL IN>
    return (df.select(sql_functions.split(df.value, ',').alias('value')).map(lambda row: LabeledPoint(float(row['value'][0]), list(row['value'][1:]))).toDF())
    
#parsed_points_df = <FILL IN>
parsed_points_df = parsed_points(autompg_f1)
parsed_points_df.show()



In [0]:
autompg_f=autompg_f.rdd



In [0]:
autompg_f = spark.createDataFrame(autompg_f)
# Changing dataframe type in mpg Dataset
autompg = autompg.withColumn("mpg", autompg["mpg"].cast('float'))
autompg = autompg.withColumn("displacement", autompg["displacement"].cast('float'))
autompg = autompg.withColumn("horsepower", autompg["horsepower"].cast('float'))
autompg = autompg.withColumn("weight", autompg["weight"].cast('float'))


