In [1]:
import os
os.environ["SPARK_VERSION"] = "3.1"
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

from pyspark.sql import SparkSession

from pyspark.sql.functions import count, round, concat
from pyspark.sql.functions import mean, min, max, sum, datediff, to_date
from pyspark.sql.functions import to_utc_timestamp, unix_timestamp, lit, datediff, col
import pyspark.sql.functions as func
from pyspark.sql.functions import when


In [2]:
#!pip install pydeequ

import pydeequ
from pydeequ.checks import *
from pydeequ.verification import *
from pydeequ.analyzers import *
from pydeequ.anomaly_detection import *
from pydeequ.profiles import *

In [3]:
# spark = SparkSession \
#     .builder \
#     .appName("testData") \
#     .config("spark.some.config.option", "some-value") \
#     .getOrCreate()
spark = SparkSession.builder.appName("testData").getOrCreate()

In [None]:
import warnings
warnings.filterwarnings("ignore")

In [None]:
data = spark.read.csv('testData.csv', header='true')

In [None]:
df = data.toPandas()

In [None]:
df.head()

In [None]:
data.show(5)

In [None]:
data.printSchema()

In [None]:
def my_count(df_in):
    df_in.agg( *[ count(c).alias(c) for c in df_in.columns ] ).show()
    
my_count(data)

In [None]:
data = data.dropna()
my_count(data)

In [None]:
for c in data.columns:
    if data.schema[c].dataType in [int, float]:
        data = data.withColumn(c, func.round(data[c], 3).cast('integer'))


In [None]:
data.printSchema()

In [None]:
data.show(2)

In [None]:
cols = data.columns[0:9]
data.select(cols).describe().show()

In [None]:
data.groupBy("price").count().sort(col('price')).show()

In [None]:
def histogram(df, col, bins=10, xname=None, yname=None):

    vals = df.select(col).rdd.flatMap(lambda x: x).histogram(bins)

    width = vals[0][1] - vals[0][0]
    loc = [vals[0][0] + (i+1) * width for i in range(len(vals[1]))]
    
    plt.bar(loc, vals[1], width=width)
    plt.xlabel(col)
    plt.ylabel(yname)
    plt.show()

In [None]:

# histogram(data, 'price', bins=500,xname='price', yname='state')

In [None]:
# from pyspark.sql import functions as F

# # Perform aggregation
# aggregated_data = data.agg(F.avg('acre_lot').alias('avg_acre_lot'), F.avg('house_size').alias('avg_house_size')) \
#     .sort('avg_acre_lot')

# # Show the aggregated data
# aggregated_data.show()


In [None]:
def lineplot(df, x, y, measure='mean'):    
    if measure == 'mean':
        pd_df = df.groupBy(x).mean(y).toPandas().sort_values(by=x)
        pd_df.plot(x, 'avg({})'.format(y), legend=False)
    elif measure == 'total':
        pd_df = df.groupBy(x).sum(y).toPandas().sort_values(by=x)
        pd_df.plot(x, 'sum({})'.format(y), legend=False)
    elif measure == 'count':
        pd_df = df.groupBy(x).count().toPandas().sort_values(by=x)
        pd_df.plot(x, 'count', legend=False)
    plt.ylabel(y)
    plt.show()

In [None]:
# lineplot(data, 'price', 'bed', measure='mean')

In [None]:
# lineplot(data, 'acre_lot', 'price', measure='mean')

In [None]:
area_lot_count =(data.groupBy('status').count().sort('status').cache())
                                         
area_lot_count = area_lot_count.count()
print('Found %d response codes' % area_lot_count)


In [None]:
# histogram(data, 'status', bins=300)

In [None]:
#remove all whitespaces from every single column in the Dataframe
tempList = [] 
for col in df.columns:
  new_name = col.strip()
  new_name = "".join(new_name.split())
  new_name = new_name.replace('.','')  
  tempList.append(new_name) 
print(tempList)

data = data.toDF(*tempList)

In [None]:
data = data.withColumn('status', round(data['acre_lot3'] / data['bed'], 2))
data.show(10)

In [None]:
data.filter((data['status']==0.01) & (data['price']>1100) & (data['acre_lot7']>1190)).show()
data.show(2)

In [None]:
data = data.withColumn('status', 
                       when((data['price'] > 10000) & (data['acre_lot7'] ==4000), 1) \
                       .when((data['acre_lot7'] > 0.1), 1) \
                       .otherwise(0))

data.head(2)

In [None]:
def pieplot(df, col, lim=10, yname=None):

    classes = df.groupBy(col).count().orderBy('count', ascending=False)
    
    pd_df = classes.limit(lim).toPandas()
    
    pd_df.plot(kind='pie', x=col, y='count', \
           labels=pd_df[col], legend=False)
    plt.ylabel(None)
    plt.show()

In [None]:
data_graph = data.filter((data['acre_lot7']==0.1) & (data['price'] > 1400))
pieplot(data_graph, 'acre_lot7')

In [None]:
def boxplot(df, col, group_by): 
    pd_df = df.toPandas()
    pd_df.boxplot(col, by=group_by, figsize=(8, 5))
    plt.ylabel(col)
    plt.title(None)
    plt.show()

In [None]:
# boxplot(data, 'price','city')

In [None]:
# # Initialization of a test
# check = Check(spark, CheckLevel.Error, "Integrity checks")

# # testData overview / data testing
# checkResult = VerificationSuite(spark) \
#     .onData(data) \
#     .addCheck(
#         check.hasSize(lambda x: x >= 50) \
#         .hasMin("price", lambda x: x > 0) \
#         .isComplete("status")  \
#         .isUnique("prev_sold_date")  \
#         .isNonNegative("price")) \
#     .run()

# # Running verification
# checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
# checkResult_df.show()

# # Describing the result
# if checkResult.status == "Success":
#     print('Tests passed')
# else:
#     print('Errors found:')

#     for check_json in checkResult.checkResults:
#         if check_json['constraint_status'] != "Success":
#             print(f"\t{check_json['constraint']} reason: {check_json['constraint_message']}")


In [None]:
# analysisResult = AnalysisRunner(spark) \
#                     .onData(data) \
#                     .addAnalyzer(Size()) \
#                     .addAnalyzer(Completeness("price")) \
#                     .addAnalyzer(ApproxCountDistinct("city")) \
#                     .addAnalyzer(Mean("acre_lot")) \
#                     .addAnalyzer(Compliance("acre_lot", "acre_lot >= 140.0")) \
#                     .addAnalyzer(Correlation("bed", "bath")) \
#                     .run()
                    
# analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
# analysisResult_df.show()

In [None]:
# result = ColumnProfilerRunner(spark) \
#     .onData(data) \
#     .run()

# for col, profile in result.profiles.items():
#     print(profile)

In [None]:
# anomalyResult = VerificationSuite(spark) \
#     .onData(data) \
#     .addAnomalyCheck(BatchNormalStrategy(lowerDeviationFactor=3.0, upperDeviationFactor=3.0, includeInterval=False)) \
#     .run()

# anomalyResult = VerificationResult.checkResultsAsDataFrame(spark, anomalyResult)
# anomalyResult.show()