###Let's Begin Ingesting Some Data

In [0]:
%pip install azureml-opendatasets

In [0]:
from azureml.opendatasets import NoaaIsdWeather
from datetime import datetime
from dateutil import parser
from dateutil.relativedelta import relativedelta

In [0]:
# remove the database and also the base folder if it already exists
#spark.sql("DROP TABLE copyIntoTable")
spark.sql("DROP DATABASE if exists ingest_bronze cascade")
dbutils.fs.rm("/tmp/ingestGettingStartedDemo", recurse = True)

In [0]:
# Create Landing Zone Folders
basePath = "/tmp/ingestGettingStartedDemo"
#landingZoneLocation = basePath + "/landingZone"     
try:
  dbutils.fs.ls(basePath)
except:
  dbutils.fs.mkdirs(basePath)
  #dbutils.fs.mkdirs(landingZoneLocation)
else:
  raise Exception("The folder " + basePath + " already exists, this notebook will remove it at the end, please change the basePath or remove the folder first")

In [0]:
dbutils.fs.mkdirs("/tmp/ingestGettingStartedDemo")

In [0]:

start_date = parser.parse('2020-5-1')
end_date = parser.parse('2020-5-10')

isd = NoaaIsdWeather(start_date, end_date)
pdf = isd.to_spark_dataframe().toPandas().to_csv("/dbfs/tmp/ingestGettingStartedDemo/May_2020_pandas.csv")

In [0]:
%fs ls /tmp/ingestGettingStartedDemo/

path,name,size
dbfs:/tmp/ingestGettingStartedDemo/May_2020_pandas.csv,May_2020_pandas.csv,417146225


#COPY INTO

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS ingest_bronze LOCATION 'dbfs:/tmp/copy_into/copyfolder/user';
USE ingest_bronze;
SHOW TABLES;

database,tableName,isTemporary


In [0]:
%sql
COPY INTO delta.`dbfs:/tmp/copy_into/copyfolder/user`   FROM 'dbfs:/tmp/ingestGettingStartedDemo/'   FILEFORMAT = CSV  FORMAT_OPTIONS('header'='true');

num_affected_rows,num_inserted_rows
3169936,3169936


In [0]:
%fs ls dbfs:/tmp/copy_into/copyfolder/user

path,name,size
dbfs:/tmp/copy_into/copyfolder/user/_delta_log/,_delta_log/,0
dbfs:/tmp/copy_into/copyfolder/user/part-00000-c80d9838-0a7a-4e73-b8f1-d00d0e446fba-c000.snappy.parquet,part-00000-c80d9838-0a7a-4e73-b8f1-d00d0e446fba-c000.snappy.parquet,9207701
dbfs:/tmp/copy_into/copyfolder/user/part-00001-fe870683-7c69-4c38-a6d4-238c1f16db53-c000.snappy.parquet,part-00001-fe870683-7c69-4c38-a6d4-238c1f16db53-c000.snappy.parquet,9197976
dbfs:/tmp/copy_into/copyfolder/user/part-00002-25901373-3c44-4367-ab79-72c27aa25af9-c000.snappy.parquet,part-00002-25901373-3c44-4367-ab79-72c27aa25af9-c000.snappy.parquet,9201752
dbfs:/tmp/copy_into/copyfolder/user/part-00003-4b86a6e9-ec51-4ab1-8368-4e3f6497a99c-c000.snappy.parquet,part-00003-4b86a6e9-ec51-4ab1-8368-4e3f6497a99c-c000.snappy.parquet,9215690
dbfs:/tmp/copy_into/copyfolder/user/part-00004-826afa9e-c652-4873-ba3d-aa96e8c970f0-c000.snappy.parquet,part-00004-826afa9e-c652-4873-ba3d-aa96e8c970f0-c000.snappy.parquet,9175517
dbfs:/tmp/copy_into/copyfolder/user/part-00005-47e2d310-b89d-43c8-9d09-fba4185ec9e3-c000.snappy.parquet,part-00005-47e2d310-b89d-43c8-9d09-fba4185ec9e3-c000.snappy.parquet,9204602
dbfs:/tmp/copy_into/copyfolder/user/part-00006-8e1afec5-1762-4356-bbe7-8c94cf411879-c000.snappy.parquet,part-00006-8e1afec5-1762-4356-bbe7-8c94cf411879-c000.snappy.parquet,9229089
dbfs:/tmp/copy_into/copyfolder/user/part-00007-83bf8af1-cd2a-4f24-989e-4aa4ecb4bd95-c000.snappy.parquet,part-00007-83bf8af1-cd2a-4f24-989e-4aa4ecb4bd95-c000.snappy.parquet,8632238


In [0]:
%sql
CREATE TABLE weather USING DELTA LOCATION 'dbfs:/tmp/copy_into/copyfolder/user';

In [0]:
%sql
select * from weather

_c0,usaf,wban,datetime,latitude,longitude,elevation,windAngle,windSpeed,temperature,seaLvlPressure,cloudCoverage,presentWeatherIndicator,pastWeatherIndicator,precipTime,precipDepth,snowDepth,stationName,countryOrRegion,p_k,year,day,version,month
0,999999,4125,2020-05-09 12:45:00,44.556,-119.646,684.0,,,8.0,,,,,,,,JOHN DAY 35 WNW,US,999999-04125,2020,9,1.0,5
1,999999,4125,2020-05-01 21:05:00,44.556,-119.646,684.0,,,21.6,,,,,,,,JOHN DAY 35 WNW,US,999999-04125,2020,1,1.0,5
2,999999,22016,2020-05-04 08:10:00,29.348,-103.209,1065.0,,,28.5,,,,,,,,PANTHER JUNCTION 2 N,US,999999-22016,2020,4,1.0,5
3,999999,4125,2020-05-05 07:59:00,44.556,-119.646,684.0,,,,,,,,24.0,0.0,,JOHN DAY 35 WNW,US,999999-04125,2020,5,1.0,5
4,999999,53182,2020-05-02 00:35:00,36.568,-101.61,1000.0,,,30.0,,,,,,,,,,999999-53182,2020,2,1.0,5
5,999999,22016,2020-05-09 02:05:00,29.348,-103.209,1065.0,,,20.3,,,,,,,,PANTHER JUNCTION 2 N,US,999999-22016,2020,9,1.0,5
6,999999,4125,2020-05-07 19:40:00,44.556,-119.646,684.0,,,16.0,,,,,,,,JOHN DAY 35 WNW,US,999999-04125,2020,7,1.0,5
7,999999,53182,2020-05-08 12:20:00,36.568,-101.61,1000.0,,,6.3,,,,,,,,,,999999-53182,2020,8,1.0,5
8,999999,22016,2020-05-08 12:40:00,29.348,-103.209,1065.0,,,24.9,,,,,,,,PANTHER JUNCTION 2 N,US,999999-22016,2020,8,1.0,5
9,999999,22016,2020-05-04 23:05:00,29.348,-103.209,1065.0,,,36.4,,,,,,,,PANTHER JUNCTION 2 N,US,999999-22016,2020,4,1.0,5


In [0]:
# autoloader table and checkpoint paths
autoloaderTable = "dbfs:/tmp/ingestAutoloader/autoloader"
autoloaderCheckpoint = "dbfs:/tmp/ingestAutoloader/autoloader/checkpoint"
landingZoneLocation = "dbfs:/tmp/ingestGettingStartedDemo/"

In [0]:
spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("cloudFiles.schemaLocation", autoloaderCheckpoint) \
  .load(landingZoneLocation)\
  .writeStream \
  .format("delta") \
  .trigger(once=True) \
  .option("checkpointLocation", autoloaderCheckpoint) \
  .start(autoloaderTable)

In [0]:
dfAutoloader = spark.read.format("delta").load(autoloaderTable)
display(dfAutoloader)

usaf,wban,datetime,latitude,longitude,elevation,windAngle,windSpeed,temperature,seaLvlPressure,cloudCoverage,presentWeatherIndicator,pastWeatherIndicator,precipTime,precipDepth,snowDepth,stationName,countryOrRegion,p_k,year,day,version,month,_rescued_data
999999,4125,2020-05-09 12:45:00,44.556,-119.646,684.0,,,8.0,,,,,,,,JOHN DAY 35 WNW,US,999999-04125,2020,9,1.0,5,"{""_c0"":""0"",""_file_path"":""dbfs:/tmp/ingestGettingStartedDemo/May_2020_pandas.csv""}"
999999,4125,2020-05-01 21:05:00,44.556,-119.646,684.0,,,21.6,,,,,,,,JOHN DAY 35 WNW,US,999999-04125,2020,1,1.0,5,"{""_c0"":""1"",""_file_path"":""dbfs:/tmp/ingestGettingStartedDemo/May_2020_pandas.csv""}"
999999,22016,2020-05-04 08:10:00,29.348,-103.209,1065.0,,,28.5,,,,,,,,PANTHER JUNCTION 2 N,US,999999-22016,2020,4,1.0,5,"{""_c0"":""2"",""_file_path"":""dbfs:/tmp/ingestGettingStartedDemo/May_2020_pandas.csv""}"
999999,4125,2020-05-05 07:59:00,44.556,-119.646,684.0,,,,,,,,24.0,0.0,,JOHN DAY 35 WNW,US,999999-04125,2020,5,1.0,5,"{""_c0"":""3"",""_file_path"":""dbfs:/tmp/ingestGettingStartedDemo/May_2020_pandas.csv""}"
999999,53182,2020-05-02 00:35:00,36.568,-101.61,1000.0,,,30.0,,,,,,,,,,999999-53182,2020,2,1.0,5,"{""_c0"":""4"",""_file_path"":""dbfs:/tmp/ingestGettingStartedDemo/May_2020_pandas.csv""}"
999999,22016,2020-05-09 02:05:00,29.348,-103.209,1065.0,,,20.3,,,,,,,,PANTHER JUNCTION 2 N,US,999999-22016,2020,9,1.0,5,"{""_c0"":""5"",""_file_path"":""dbfs:/tmp/ingestGettingStartedDemo/May_2020_pandas.csv""}"
999999,4125,2020-05-07 19:40:00,44.556,-119.646,684.0,,,16.0,,,,,,,,JOHN DAY 35 WNW,US,999999-04125,2020,7,1.0,5,"{""_c0"":""6"",""_file_path"":""dbfs:/tmp/ingestGettingStartedDemo/May_2020_pandas.csv""}"
999999,53182,2020-05-08 12:20:00,36.568,-101.61,1000.0,,,6.3,,,,,,,,,,999999-53182,2020,8,1.0,5,"{""_c0"":""7"",""_file_path"":""dbfs:/tmp/ingestGettingStartedDemo/May_2020_pandas.csv""}"
999999,22016,2020-05-08 12:40:00,29.348,-103.209,1065.0,,,24.9,,,,,,,,PANTHER JUNCTION 2 N,US,999999-22016,2020,8,1.0,5,"{""_c0"":""8"",""_file_path"":""dbfs:/tmp/ingestGettingStartedDemo/May_2020_pandas.csv""}"
999999,22016,2020-05-04 23:05:00,29.348,-103.209,1065.0,,,36.4,,,,,,,,PANTHER JUNCTION 2 N,US,999999-22016,2020,4,1.0,5,"{""_c0"":""9"",""_file_path"":""dbfs:/tmp/ingestGettingStartedDemo/May_2020_pandas.csv""}"


In [0]:
%fs ls dbfs:/tmp/ingestAutoloader/autoloader/

path,name,size
dbfs:/tmp/ingestAutoloader/autoloader/_delta_log/,_delta_log/,0
dbfs:/tmp/ingestAutoloader/autoloader/checkpoint/,checkpoint/,0
dbfs:/tmp/ingestAutoloader/autoloader/part-00000-4cd8af14-b96d-4267-b02b-22bbaec73409-c000.snappy.parquet,part-00000-4cd8af14-b96d-4267-b02b-22bbaec73409-c000.snappy.parquet,83526123
dbfs:/tmp/ingestAutoloader/autoloader/part-00000-563e7bfc-accc-422d-bfbe-097d7c05f1fe-c000.snappy.parquet,part-00000-563e7bfc-accc-422d-bfbe-097d7c05f1fe-c000.snappy.parquet,83911956


In [0]:
start_date = parser.parse('2020-6-1')
end_date = parser.parse('2020-6-10')
isd = NoaaIsdWeather(start_date, end_date)
pdf = isd.to_spark_dataframe().toPandas().to_csv("/dbfs/tmp/ingestGettingStartedDemo/June_2020_pandas.csv")

In [0]:
%fs ls /tmp/ingestGettingStartedDemo/

path,name,size
dbfs:/tmp/ingestGettingStartedDemo/June_2020_pandas.csv,June_2020_pandas.csv,418911231
dbfs:/tmp/ingestGettingStartedDemo/May_2020_pandas.csv,May_2020_pandas.csv,417146225


In [0]:
%sql
USE ingest_bronze;

COPY INTO delta.`dbfs:/tmp/copy_into/copyfolder/user`   FROM 'dbfs:/tmp/ingestGettingStartedDemo/'   FILEFORMAT = CSV  FORMAT_OPTIONS('header'='true');

num_affected_rows,num_inserted_rows
3180148,3180148


In [0]:
%sql
select count(*) from weather

count(1)
6350084


## Ingesting the same file using Autoloader

In [0]:
spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("cloudFiles.schemaLocation", autoloaderCheckpoint) \
  .load(landingZoneLocation)\
  .writeStream \
  .format("delta") \
  .trigger(once=True) \
  .option("checkpointLocation", autoloaderCheckpoint) \
  .start(autoloaderTable)

In [0]:
dfAutoloader = spark.read.format("delta").load(autoloaderTable)
display(dfAutoloader)

usaf,wban,datetime,latitude,longitude,elevation,windAngle,windSpeed,temperature,seaLvlPressure,cloudCoverage,presentWeatherIndicator,pastWeatherIndicator,precipTime,precipDepth,snowDepth,stationName,countryOrRegion,p_k,year,day,version,month,_rescued_data
999999,4125,2020-05-09 12:45:00,44.556,-119.646,684.0,,,8.0,,,,,,,,JOHN DAY 35 WNW,US,999999-04125,2020,9,1.0,5,"{""_c0"":""0"",""_file_path"":""dbfs:/tmp/ingestGettingStartedDemo/May_2020_pandas.csv""}"
999999,4125,2020-05-01 21:05:00,44.556,-119.646,684.0,,,21.6,,,,,,,,JOHN DAY 35 WNW,US,999999-04125,2020,1,1.0,5,"{""_c0"":""1"",""_file_path"":""dbfs:/tmp/ingestGettingStartedDemo/May_2020_pandas.csv""}"
999999,22016,2020-05-04 08:10:00,29.348,-103.209,1065.0,,,28.5,,,,,,,,PANTHER JUNCTION 2 N,US,999999-22016,2020,4,1.0,5,"{""_c0"":""2"",""_file_path"":""dbfs:/tmp/ingestGettingStartedDemo/May_2020_pandas.csv""}"
999999,4125,2020-05-05 07:59:00,44.556,-119.646,684.0,,,,,,,,24.0,0.0,,JOHN DAY 35 WNW,US,999999-04125,2020,5,1.0,5,"{""_c0"":""3"",""_file_path"":""dbfs:/tmp/ingestGettingStartedDemo/May_2020_pandas.csv""}"
999999,53182,2020-05-02 00:35:00,36.568,-101.61,1000.0,,,30.0,,,,,,,,,,999999-53182,2020,2,1.0,5,"{""_c0"":""4"",""_file_path"":""dbfs:/tmp/ingestGettingStartedDemo/May_2020_pandas.csv""}"
999999,22016,2020-05-09 02:05:00,29.348,-103.209,1065.0,,,20.3,,,,,,,,PANTHER JUNCTION 2 N,US,999999-22016,2020,9,1.0,5,"{""_c0"":""5"",""_file_path"":""dbfs:/tmp/ingestGettingStartedDemo/May_2020_pandas.csv""}"
999999,4125,2020-05-07 19:40:00,44.556,-119.646,684.0,,,16.0,,,,,,,,JOHN DAY 35 WNW,US,999999-04125,2020,7,1.0,5,"{""_c0"":""6"",""_file_path"":""dbfs:/tmp/ingestGettingStartedDemo/May_2020_pandas.csv""}"
999999,53182,2020-05-08 12:20:00,36.568,-101.61,1000.0,,,6.3,,,,,,,,,,999999-53182,2020,8,1.0,5,"{""_c0"":""7"",""_file_path"":""dbfs:/tmp/ingestGettingStartedDemo/May_2020_pandas.csv""}"
999999,22016,2020-05-08 12:40:00,29.348,-103.209,1065.0,,,24.9,,,,,,,,PANTHER JUNCTION 2 N,US,999999-22016,2020,8,1.0,5,"{""_c0"":""8"",""_file_path"":""dbfs:/tmp/ingestGettingStartedDemo/May_2020_pandas.csv""}"
999999,22016,2020-05-04 23:05:00,29.348,-103.209,1065.0,,,36.4,,,,,,,,PANTHER JUNCTION 2 N,US,999999-22016,2020,4,1.0,5,"{""_c0"":""9"",""_file_path"":""dbfs:/tmp/ingestGettingStartedDemo/May_2020_pandas.csv""}"
