##Create Delta Table

In [0]:
%sql
CREATE table quickstart_catalog.quickstart_schema.users_int (
  id INT,
  name STRING,
  dob DATE,
  email STRING,
  gender STRING,
  country STRING,
  region STRING,
  city STRING,
  asset INT,
  marital_status STRING
) USING DELTA;
 
DESCRIBE EXTENDED quickstart_catalog.quickstart_schema.users_int;

##Incremental data Ingestion using COPY Into Command -using manual schema evolution

In [0]:
%sql
COPY INTO quickstart_catalog.quickstart_schema.users_int FROM (
  SELECT
  id::INT,
  name::STRING,
  to_date(dob,'yyyy-MM-dd') as dob,
  email::STRING,
  gender::STRING,
  country::STRING,
  region::STRING,
  city::STRING,
  asset::INT,
  marital_status::STRING,
  education::STRING
  FROM '/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/user_staging/'
)
FILEFORMAT = CSV
FORMAT_OPTIONS ('header'='true')

####above will upload only incremetal file, if execute same without uploading new file , it won't upload anything
#### If you upload with additional column it won't give error just ignore column, if you check describe table , no new column is added

In [0]:
%sql
select * from quickstart_catalog.quickstart_schema.users_int where id >5000

In [0]:
%sql
DESCRIBE TABLE quickstart_catalog.quickstart_schema.users_int

##Alter table manually

###add column and than upload file to get new column in table

In [0]:
%sql

ALTER TABLE quickstart_catalog.quickstart_schema.users_int ADD COLUMN(education STRING);
DESCRIBE FORMATTED quickstart_catalog.quickstart_schema.users_int;

##Automated Scehma Evolution

In [0]:
%sql
 
DROP TABLE quickstart_catalog.quickstart_schema.users_int;
CREATE TABLE IF NOT EXISTS quickstart_catalog.quickstart_schema.users_int;
DESCRIBE FORMATTED quickstart_catalog.quickstart_schema.users_int;
 

###created using Merger Schema so it will add new column in tabble automatically

In [0]:
%sql

COPY INTO quickstart_catalog.quickstart_schema.users_int FROM
'/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/user_staging_new/' FILEFORMAT = CSV
FORMAT_OPTIONS (
  'header' = 'true', 'inferSchema' = 'true', 'mergeSchema' = 'true'
) COPY_OPTIONS ('mergeSchema' = 'true')

 
 

In [0]:
%sql
select count(*) from quickstart_catalog.quickstart_schema.users_int;

#Data Streaming in Delta Table

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, explode
from pyspark.sql.types import StructType, StructField, StringType

'''if __name__ == '__main__':
    spark = (SparkSession.builder
             .appName("Streaming App")
             .master("local")
             .config("spark.sql.shuffle.partitions",10)
             .getOrCreate())'''

    CRIME_SCHEMA = StructType(
        [
            StructField("code", StringType()),
            StructField("region", StringType()),
            StructField("category", StringType())
        ]
    )
    input_df = spark.readStream.load(format="CSV", path="./crime_data/input/", schema=CRIME_SCHEMA)

    result_df = input_df.filter(col("region")=="Downtown")

    result_df.writeStream.start(format="console", outputMode="APPEND").awaitTermination()

##Write o/p in datalake

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType
 
CRIME_SCHEMA = StructType(
    [
        StructField("code", StringType()),
        StructField("region", StringType()),
        StructField("category", StringType()),
    ]
)
 
input_df = (
    spark.readStream.format("csv")
    .schema(CRIME_SCHEMA)
    .load(
        "/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/crime_data/input/"
    )
)
 /Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/crime_data/input/
result_df = input_df.filter(col("region") == "Downtown")
 
result_df.writeStream.format("delta").outputMode("append").trigger(
    availableNow=True
).option("checkpointLocation", "/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/crime_data/checkpoint").option(
    "path",
    "/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/crime_data/output/",
).start().awaitTermination()

##Wite in Delta table

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType
 
def ingest_data():
    CRIME_SCHEMA = StructType(
        [
            StructField("code", StringType()),
            StructField("region", StringType()),
            StructField("category", StringType()),
        ]
    )
    
    input_df = (
        spark.readStream.format("csv")
        .schema(CRIME_SCHEMA)
        .load(
            "/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/crime_data/input/"
        )
    )
    
    result_df = input_df.filter(col("region") == "Downtown")
    
    result_df.writeStream.format("delta").outputMode("append").trigger(
        availableNow=True
    ).option(
        "checkpointLocation",
        "/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/crime_data/checkpoint_01",
    ).toTable(
        "quickstart_catalog.quickstart_schema.crime_tbl"
    )



In [0]:
ingest_data()