In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('wine').getOrCreate()

In [3]:
data = spark.read.csv(r"C:\Users\Asus\Desktop\winequality-red.csv", inferSchema=True, header=True)

In [4]:
data.columns

['fixed acidity',
 'volatile acidity',
 'citric acid',
 'residual sugar',
 'chlorides',
 'free sulfur dioxide',
 'total sulfur dioxide',
 'density',
 'pH',
 'sulphates',
 'alcohol',
 'quality']

In [8]:
data.head(5)

[Row(fixed acidity=7.4, volatile acidity=0.7, citric acid=0.0, residual sugar=1.9, chlorides=0.076, free sulfur dioxide=11.0, total sulfur dioxide=34.0, density=0.9978, pH=3.51, sulphates=0.56, alcohol=9.4, quality=5),
 Row(fixed acidity=7.8, volatile acidity=0.88, citric acid=0.0, residual sugar=2.6, chlorides=0.098, free sulfur dioxide=25.0, total sulfur dioxide=67.0, density=0.9968, pH=3.2, sulphates=0.68, alcohol=9.8, quality=5),
 Row(fixed acidity=7.8, volatile acidity=0.76, citric acid=0.04, residual sugar=2.3, chlorides=0.092, free sulfur dioxide=15.0, total sulfur dioxide=54.0, density=0.997, pH=3.26, sulphates=0.65, alcohol=9.8, quality=5),
 Row(fixed acidity=11.2, volatile acidity=0.28, citric acid=0.56, residual sugar=1.9, chlorides=0.075, free sulfur dioxide=17.0, total sulfur dioxide=60.0, density=0.998, pH=3.16, sulphates=0.58, alcohol=9.8, quality=6),
 Row(fixed acidity=7.4, volatile acidity=0.7, citric acid=0.0, residual sugar=1.9, chlorides=0.076, free sulfur dioxide=1

In [9]:
df = data.toPandas()

In [15]:
df

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,7.4,0.700,0.00,1.9,0.076,11.0,34.0,0.99780,3.51,0.56,9.4,5
1,7.8,0.880,0.00,2.6,0.098,25.0,67.0,0.99680,3.20,0.68,9.8,5
2,7.8,0.760,0.04,2.3,0.092,15.0,54.0,0.99700,3.26,0.65,9.8,5
3,11.2,0.280,0.56,1.9,0.075,17.0,60.0,0.99800,3.16,0.58,9.8,6
4,7.4,0.700,0.00,1.9,0.076,11.0,34.0,0.99780,3.51,0.56,9.4,5
...,...,...,...,...,...,...,...,...,...,...,...,...
1594,6.2,0.600,0.08,2.0,0.090,32.0,44.0,0.99490,3.45,0.58,10.5,5
1595,5.9,0.550,0.10,2.2,0.062,39.0,51.0,0.99512,3.52,0.76,11.2,6
1596,6.3,0.510,0.13,2.3,0.076,29.0,40.0,0.99574,3.42,0.75,11.0,6
1597,5.9,0.645,0.12,2.0,0.075,32.0,44.0,0.99547,3.57,0.71,10.2,5


In [17]:
formula = 'quality ~ ' + ' + '.join(df.columns.difference(['quality']))

In [50]:
from pyspark.sql.functions import col

In [13]:
import statsmodels.formula.api as sfa


DataFrame[fixedacidity: double, volatileacidity: double, citricacid: double, residualsugar: double, chlorides: double, freesulfurdioxide: double, totalsulfurdioxide: double, density: double, pH: double, sulphates: double, alcohol: double, quality: int]

In [42]:
model = sfa.ols(formula=formula, data=df).fit()

In [43]:
model.summary()

0,1,2,3
Dep. Variable:,quality,R-squared:,0.361
Model:,OLS,Adj. R-squared:,0.356
Method:,Least Squares,F-statistic:,81.35
Date:,"Fri, 05 Apr 2024",Prob (F-statistic):,1.79e-145
Time:,14:08:26,Log-Likelihood:,-1569.1
No. Observations:,1599,AIC:,3162.0
Df Residuals:,1587,BIC:,3227.0
Df Model:,11,,
Covariance Type:,nonrobust,,

0,1,2,3,4,5,6
,coef,std err,t,P>|t|,[0.025,0.975]
Intercept,21.9652,21.195,1.036,0.300,-19.607,63.538
alcohol,0.2762,0.026,10.429,0.000,0.224,0.328
chlorides,-1.8742,0.419,-4.470,0.000,-2.697,-1.052
citric_acid,-0.1826,0.147,-1.240,0.215,-0.471,0.106
density,-17.8812,21.633,-0.827,0.409,-60.314,24.551
fixed_acidity,0.0250,0.026,0.963,0.336,-0.026,0.076
free_sulfur_dioxide,0.0044,0.002,2.009,0.045,0.000,0.009
pH,-0.4137,0.192,-2.159,0.031,-0.789,-0.038
residual_sugar,0.0163,0.015,1.089,0.276,-0.013,0.046

0,1,2,3
Omnibus:,27.376,Durbin-Watson:,1.757
Prob(Omnibus):,0.0,Jarque-Bera (JB):,40.965
Skew:,-0.168,Prob(JB):,1.27e-09
Kurtosis:,3.708,Cond. No.,113000.0


In [44]:
import statsmodels.formula.api as smf

def forward_selection(data, response):
    remaining = set(data.columns)
    remaining.remove(response)
    selected = []
    current_score, best_new_score = float('inf'), float('inf')
    while remaining and current_score == best_new_score:
        scores_with_candidates = []
        for candidate in remaining:
            formula = "{} ~ {}".format(response, ' + '.join(selected + [candidate]))
            score = smf.ols(formula, data).fit().aic
            scores_with_candidates.append((score, candidate))
        scores_with_candidates.sort()
        best_new_score, best_candidate = scores_with_candidates.pop(0)
        if current_score > best_new_score:
            remaining.remove(best_candidate)
            selected.append(best_candidate)
            current_score = best_new_score
    formula = "{} ~ {}".format(response, ' + '.join(selected))
    model = smf.ols(formula, data).fit()
    return model, selected

model, selected_columns = forward_selection(df, 'quality')



In [45]:
from pyspark.ml.feature import VectorAssembler

In [86]:
data.columns

['fixedacidity',
 'volatileacidity',
 'citricacid',
 'residualsugar',
 'chlorides',
 'freesulfurdioxide',
 'totalsulfurdioxide',
 'density',
 'pH',
 'sulphates',
 'alcohol',
 'quality']

In [88]:
assembler = VectorAssembler(inputCols=['fixedacidity',
 'volatileacidity',
 'citricacid',
 'residualsugar',
 'chlorides',
 'freesulfurdioxide',
 'totalsulfurdioxide',
 'density',
 'pH',
 'sulphates',
 'alcohol'], outputCol='features')

In [89]:
trans = assembler.transform(data)

In [90]:
trans.describe()

DataFrame[summary: string, fixedacidity: string, volatileacidity: string, citricacid: string, residualsugar: string, chlorides: string, freesulfurdioxide: string, totalsulfurdioxide: string, density: string, pH: string, sulphates: string, alcohol: string, quality: string]

In [91]:
trans.head()

Row(fixedacidity=7.4, volatileacidity=0.7, citricacid=0.0, residualsugar=1.9, chlorides=0.076, freesulfurdioxide=11.0, totalsulfurdioxide=34.0, density=0.9978, pH=3.51, sulphates=0.56, alcohol=9.4, quality=5, features=DenseVector([7.4, 0.7, 0.0, 1.9, 0.076, 11.0, 34.0, 0.9978, 3.51, 0.56, 9.4]))

In [100]:
from pyspark.ml.classification import RandomForestClassifier

In [93]:
from pyspark.sql.functions import countDistinct

In [94]:
filt_data = trans.select('features', 'quality')

In [95]:
train_data, test_data = filt_data.randomSplit([0.7, 0.3])

In [101]:
rfc = RandomForestClassifier(labelCol='quality', featuresCol='features', numTrees=10)

In [102]:
modelRFC = rfc.fit(train_data)

In [103]:
pred = modelRFC.transform(test_data)

In [108]:
pred.select('quality', 'prediction').where(pred.quality != pred.prediction).agg()

NameError: name 'count' is not defined

In [96]:
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial",labelCol='quality', featuresCol='features')

In [97]:
model = lr.fit(train_data)

In [98]:
pred = model.transform(test_data)

In [99]:
pred.show()

+--------------------+-------+--------------------+--------------------+----------+
|            features|quality|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[5.0,0.74,0.0,1.2...|      6|[-3.1607145834237...|[5.47822650721164...|       5.0|
|[5.2,0.32,0.25,1....|      5|[-3.1607145834237...|[5.47822650721164...|       5.0|
|[5.2,0.34,0.0,1.8...|      6|[-3.1607145834237...|[5.47822650721164...|       5.0|
|[5.2,0.48,0.04,1....|      7|[-3.1607145834237...|[5.47822650721164...|       5.0|
|[5.2,0.49,0.26,2....|      6|[-3.1607145834237...|[5.47822650721164...|       5.0|
|[5.3,0.47,0.11,2....|      7|[-3.1607145834237...|[5.47822650721164...|       5.0|
|[5.4,0.58,0.08,1....|      6|[-3.1607145834237...|[5.47822650721164...|       5.0|
|[5.4,0.74,0.0,1.2...|      6|[-3.1607145834237...|[5.47822650721164...|       5.0|
|[5.4,0.74,0.09,1....|      6|[-3.1607145834237...|[5.47822650721164...|    

In [2]:
data = spark.read.csv(r"C:\Users\Asus\Desktop\onlinefoods.csv", inferSchema = True, header = True)

NameError: name 'spark' is not defined

In [117]:
data.printSchema

<bound method DataFrame.printSchema of DataFrame[Age: int, Gender: string, Marital Status: string, Occupation: string, Monthly Income: string, Educational Qualifications: string, Family size: int, latitude: double, longitude: double, Pin code: int, Output: string, Feedback: string, _c12: string]>

In [1]:
data.head()

NameError: name 'data' is not defined

In [119]:
data.head(5)

[Row(Age=20, Gender='Female', Marital Status='Single', Occupation='Student', Monthly Income='No Income', Educational Qualifications='Post Graduate', Family size=4, latitude=12.9766, longitude=77.5993, Pin code=560001, Output='Yes', Feedback='Positive', _c12='Yes'),
 Row(Age=24, Gender='Female', Marital Status='Single', Occupation='Student', Monthly Income='Below Rs.10000', Educational Qualifications='Graduate', Family size=3, latitude=12.977, longitude=77.5773, Pin code=560009, Output='Yes', Feedback='Positive', _c12='Yes'),
 Row(Age=22, Gender='Male', Marital Status='Single', Occupation='Student', Monthly Income='Below Rs.10000', Educational Qualifications='Post Graduate', Family size=3, latitude=12.9551, longitude=77.6593, Pin code=560017, Output='Yes', Feedback='Negative ', _c12='Yes'),
 Row(Age=22, Gender='Female', Marital Status='Single', Occupation='Student', Monthly Income='No Income', Educational Qualifications='Graduate', Family size=6, latitude=12.9473, longitude=77.5616, Pin