In [33]:
from delta import *
import pyspark
from pyspark import SparkContext, SparkConf, SQLContext
from dotenv import load_dotenv
import os

load_dotenv()

spark = pyspark.sql.SparkSession.builder.appName("DeltaLake") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0,org.apache.hadoop:hadoop-aws:3.2.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.path.style.access", True) \
    .config("spark.hadoop.fs.s3a.access.key", os.getenv("MINIO_USER")) \
    .config("spark.hadoop.fs.s3a.secret.key", os.getenv("MINIO_PASSWORD")) \
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("com.amazonaws.services.s3.enableV4", True) \
    .config("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true") \
    .getOrCreate()

#spark = spark = configure_spark_with_delta_pip(builder).getOrCreate()


In [34]:
print(os.getenv("MINIO_USER"))
print(os.getenv("MINIO_PASSWORD"))

admin
superadmin


In [35]:
import boto3

s3 = boto3.client(
    service_name="s3",
    endpoint_url="http://localhost:9000",
    aws_access_key_id=os.getenv("MINIO_USER"),
    aws_secret_access_key=os.getenv("MINIO_PASSWORD")
)

s3Resource = boto3.resource(
    service_name="s3",
    endpoint_url="http://localhost:9000",
    aws_access_key_id=os.getenv("MINIO_USER"),
    aws_secret_access_key=os.getenv("MINIO_PASSWORD")
)

if s3Resource.Bucket("apt").creation_date is None:
    s3.create_bucket(Bucket="apt")
    df = spark.read.csv("../data/apt_data.csv")
    df.write.format("delta").save("s3a://apt/delta")

In [36]:
# Bucket needs to be created first

df = spark.read.format("delta").load("s3a://apt/delta")
df.toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7
0,id,title,price,number_of_room,apt_size,district,city,street_address
1,0,"3 Zimmer Wohnung zu vermieten, Top 5",€ 648,3 Zimmer,"56,82 m",Bludenz,Bludesch,", Mühleplatz 5"
2,2,GENIALE - 2 Zimmer EG - Wohnung in Nüziders zu...,€ 792,2 Zimmer,58 m,Bludenz,Nüziders,", Im Hag 41"
3,4,Traumhafte 2 Zimmer Wohnung im EG zu vermieten...,€ 854,2 Zimmer,"46,7 m",Bludenz,Bludesch,", Farbgasse 11"
4,6,2-Zimmer Dachgeschosswohnung!,€ 790,2 Zimmer,50 m,Bregenz,Bregenz,", Ammianusstraße 2"
...,...,...,...,...,...,...,...,...
382,1038,Ferienapartment Gurtisspitze,"€ 747,34",2 Zimmer,43 m,Bludenz,Gurtis,", Bazora 1"
383,1040,4-Zimmer Wohnung im Zentrum von Dornbirn,€ 1.660,4 Zimmer,"95,24 m",Dornbirn,Dornbirn,", Poststraße 1"
384,1044,Gemütliche 2-Zimmer-Wohnung in Muntlix,"€ 642,55",2 Zimmer,38 m,Feldkirch,Muntlix,", Arkenstraße"
385,1046,Neue 3-Zimmerwohnung W5 im 2. OG Dornbirn mit ...,"€ 1.737,96",3 Zimmer,"94,61 m",Dornbirn,Dornbirn,", Bergmannstraße"


In [37]:
from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---+---+---+---+---+---+---+---+
|_c0|_c1|_c2|_c3|_c4|_c5|_c6|_c7|
+---+---+---+---+---+---+---+---+
|  0|  0|  0|  0|  0|  0|  0|  0|
+---+---+---+---+---+---+---+---+



In [38]:
clean = df.na.drop("any")

In [39]:
clean.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()
clean.toPandas()

+---+---+---+---+---+---+---+---+
|_c0|_c1|_c2|_c3|_c4|_c5|_c6|_c7|
+---+---+---+---+---+---+---+---+
|  0|  0|  0|  0|  0|  0|  0|  0|
+---+---+---+---+---+---+---+---+



Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7
0,id,title,price,number_of_room,apt_size,district,city,street_address
1,0,"3 Zimmer Wohnung zu vermieten, Top 5",€ 648,3 Zimmer,"56,82 m",Bludenz,Bludesch,", Mühleplatz 5"
2,2,GENIALE - 2 Zimmer EG - Wohnung in Nüziders zu...,€ 792,2 Zimmer,58 m,Bludenz,Nüziders,", Im Hag 41"
3,4,Traumhafte 2 Zimmer Wohnung im EG zu vermieten...,€ 854,2 Zimmer,"46,7 m",Bludenz,Bludesch,", Farbgasse 11"
4,6,2-Zimmer Dachgeschosswohnung!,€ 790,2 Zimmer,50 m,Bregenz,Bregenz,", Ammianusstraße 2"
...,...,...,...,...,...,...,...,...
382,1038,Ferienapartment Gurtisspitze,"€ 747,34",2 Zimmer,43 m,Bludenz,Gurtis,", Bazora 1"
383,1040,4-Zimmer Wohnung im Zentrum von Dornbirn,€ 1.660,4 Zimmer,"95,24 m",Dornbirn,Dornbirn,", Poststraße 1"
384,1044,Gemütliche 2-Zimmer-Wohnung in Muntlix,"€ 642,55",2 Zimmer,38 m,Feldkirch,Muntlix,", Arkenstraße"
385,1046,Neue 3-Zimmerwohnung W5 im 2. OG Dornbirn mit ...,"€ 1.737,96",3 Zimmer,"94,61 m",Dornbirn,Dornbirn,", Bergmannstraße"


In [40]:
clean.write.format("delta").mode("overwrite").save("s3a://apt/delta")