In [0]:
import json
from urllib.request import urlretrieve
from collections import defaultdict
import pandas as pd
import numpy as np
import seaborn as sns
from matplotlib import pyplot as plt
import requests
from pyspark.sql import Row

## Create database

In [0]:
%sql 
CREATE DATABASE IF NOT EXISTS MILESTONE2WATER

#SHORTAGE DATA API

#### In this first section, the data is read in a list of dictionary records and converted to a Pandas dataframe and stored as CSV

- URL = https://data.ca.gov/dataset/household-water-supply-shortage-reporting-system-data/resource/1625f0d1-20c0-4059-ba59-b26f0ff3a1c3
- Click on Data API
- Copy Query example for request URL with limit and search terms
- The data can also be downloaded

In [0]:
#https://stackoverflow.com/questions/63650969/databricks-consuming-rest-api?rq=1# 

api_retrieve = requests.get('https://data.cnra.ca.gov/api/3/action/datastore_search?resource_id=e1fd9f48-a613-4567-8042-3d2e064d77c8&limit=9000').json()
#Create a list of records
data_shortage = api_retrieve['result']['records']
pd_df_shortage = pd.DataFrame(data_shortage)
pd_df_shortage.to_csv('/dbfs/FileStore/shortage.csv')



#df_shortage = spark.createDataFrame(Row(**row) for row in data_shortage)

### Get Well Completion data

In [0]:
well_completion_api = requests.get('https://data.cnra.ca.gov/api/3/action/datastore_search?resource_id=8da7b93b-4e69-495d-9caa-335691a1896b&limit=3200').json()
list_data_completion = well_completion_api['result']['records']

while well_completion_api['result']['records']:
    well_completion_api = requests.get('https://data.cnra.ca.gov'+well_completion_api['result']['_links']["next"]).json()
    list_data_completion.extend(well_completion_api['result']['records'])
#Create a list of records

pd_df_wellcompletion = pd.DataFrame(list_data_completion)
pd_df_wellcompletion.to_csv('/dbfs/FileStore/wellcompletion.csv')

### Get periodic ground water data

In [0]:
groundwater_request_api = requests.get('https://data.cnra.ca.gov/api/3/action/datastore_search?resource_id=bfa9f262-24a1-45bd-8dc8-138bc8107266&limit=4000').json()
list_data_groundwater = groundwater_request_api['result']['records']
while groundwater_request_api['result']['records']:
    groundwater_request_api = requests.get('https://data.cnra.ca.gov'+groundwater_request_api['result']['_links']["next"]).json()
    list_data_groundwater.extend(groundwater_request_api['result']['records'])
    
    
pd_df_groundwaterwellcompletion = pd.DataFrame(list_data_groundwater)
pd_df_groundwater.to_csv('/dbfs/FileStore/groundwater.csv')

### Get Station information

In [0]:
# stations data has the location info for the measurement wells
stations_location_api = requests.get('https://data.cnra.ca.gov/api/3/action/datastore_search?resource_id=af157380-fb42-4abf-b72a-6f9f98868077&limit=32000').json()
list_data_stations = stations_location_api['result']['records']

while stations_location_api['result']['records']:
    stations_location_api = requests.get('https://data.cnra.ca.gov'+stations_location_api['result']['_links']["next"]).json()
    data_stations.extend(stations_location_api['result']['records'])
    
    
pd_df_station = pd.DataFrame(list_data_stations)
pd_df_station.to_csv('/dbfs/FileStore/station.csv')

### Convert a pandas dataframe to Spark with infer schema


- spark_df = pandas_to_spark(pandas_df)

In [0]:
from pyspark.sql.types import *

# Auxiliar functions
def equivalent_type(f):
    if f == 'datetime64[ns]': return TimestampType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)

# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
      struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return sqlContext.createDataFrame(pandas_df, p_schema)

# Method 2
- Create a spark dataframe from JSON data.  Coalesce the data into one parquet file and store it. 
- Read the Parquet file back using Spark

In [0]:

api_retrieve = requests.get('https://data.cnra.ca.gov/api/3/action/datastore_search?resource_id=e1fd9f48-a613-4567-8042-3d2e064d77c8&limit=9000').json()
#Create a list of records
list_data_shortage = api_retrieve['result']['records']



df_shortage = spark.createDataFrame(Row(**row) for row in list_data_shortage)

df_shortage.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("dbfs:/FileStore/WaterWell/shortage.csv")

Spark dataframe stored as CSV for well completion

In [0]:
sc.stop()
configura=SparkConf().set('spark.rpc.message.maxSize','1024')
sc=SparkContext.getOrCreate(conf=configura)
spark = SparkSession.builder.getOrCreate()

In [0]:
well_completion_api = requests.get('https://data.cnra.ca.gov/api/3/action/datastore_search?resource_id=8da7b93b-4e69-495d-9caa-335691a1896b&limit=4000').json()
list_data_wellcompletion = well_completion_api['result']['records']
#Create a list of records
while well_completion_api['result']['records']:
    well_completion_api = requests.get('https://data.cnra.ca.gov'+well_completion_api['result']['_links']["next"]).json()
    list_data_wellcompletion.extend(well_completion_api['result']['records'])


df_wellcompletion = spark.createDataFrame(Row(**row) for row in list_data_wellcompletion)



In [0]:
df_wellcompletion.count()

In [0]:
df_wellcompletion.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("dbfs:/FileStore/WaterWell/spark_df_wellcompletion.csv")

## Spark dataframe saved as CSV for groundwater

In [0]:
#Create a list of records

groundwater_request_api = requests.get('https://data.cnra.ca.gov/api/3/action/datastore_search?resource_id=bfa9f262-24a1-45bd-8dc8-138bc8107266&limit=4000').json()
list_data_groundwater = groundwater_request_api['result']['records']
while groundwater_request_api['result']['records']:
    groundwater_request_api = requests.get('https://data.cnra.ca.gov'+groundwater_request_api['result']['_links']["next"]).json()
    list_data_groundwater.extend(groundwater_request_api['result']['records'])
    
df_groundwater = spark.createDataFrame(Row(**row) for row in list_data_groundwater)
df_groundwater.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("dbfs:/FileStore/WaterWell/groundwater.csv")    

## Spark dataframe saved as CSV for station

In [0]:
%sql
show tables in milestone2water

database,tableName,isTemporary
milestone2water,table_completion,False
milestone2water,table_groundwater,False
milestone2water,table_shortage,False
milestone2water,table_station,False


In [0]:
spark_df_station = spark.sql("""select * from milestone2water.table_station""")
spark_df_station.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("dbfs:/FileStore/WaterWell/spark_df_station.csv")

In [0]:
spark_df_wellcompletion = spark.sql("""select * from milestone2water.table_completion""")
spark_df_wellcompletion.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("dbfs:/FileStore/WaterWell/spark_df_wellcompletion.csv")


In [0]:
spark_df_groundwater = spark.sql("""select * from milestone2water.table_groundwater""")
spark_df_groundwater.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("dbfs:/FileStore/WaterWell/spark_df_groundwater.csv")

In [0]:
spark_df_shortage = spark.sql("""select * from milestone2water.table_shortage""")
spark_df_shortage.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("dbfs:/FileStore/WaterWell/spark_df_shortage.csv")

### Create the HTML, which when pasted in a new browser window will automatically download the file

From the URL of the notebook, pick the strings shown

**https://adb-8268979028189023.3.azuredatabricks.net/?o=8268979028189023**#notebook/2018882991844828/command/2018882991844900


insert the file name and part name of the one parquet file created (go to data tab->Create Table-> DBFS-> FileStore)


https://adb-8268979028189023.3.azuredatabricks.net/files/WaterWell/shortage.csv/part-00000-tid-4400703674217460447-3a2e9c38-2679-4e6d-b5d7-e1a54b2d65ab-2-1-c000.csv?o=8268979028189023

### Read the "part" dataframe generated CSV into spark dataframe

In [0]:
# Reading csv file
# File location and type
file_location = "/FileStore/WaterWell/shortage.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

Well Depth,StatusType,Status,Household Support,Pump Rate Reduction,Water Issues,Shortage Type,Was Issue Resolved?,CREATE DATE,Approximate Issue Start Date,Measure Date,Primary Usages,Region,LONGITUDE,Well to Water Depth,ID,CITY,Approximate Repair Cost,County,Report Date,LATITUDE,_id,Additional Info
45',Outage,Undefined,,>45',,Dry well (groundwater),,09/11/2015,07/07/2015,,Household,Inland,-120.884807,"May, 2015",3102,Empire,,Stanislaus,2015-09-11T00:00:00,37.657762,1.0,
47',Outage,Undefined,,44',,Dry well (groundwater),,09/11/2015,07/27/2015,,Household,Inland,-120.71052,Sept. 2014,3103,Hickman,,Stanislaus,2015-09-11T00:00:00,37.6310712,2.0,
85',Outage,Undefined,,>85',,Dry well (groundwater),,09/11/2015,08/01/2015,,Household,Inland,-120.816245,42220,3105,Denair,,Stanislaus,2015-09-11T00:00:00,37.578196,3.0,
110',Outage,Undefined,,UNK,,Dry well (groundwater),,09/11/2015,08/01/2015,,Household,Inland,-120.960309,UNK,3108,Ceres,,Stanislaus,2015-09-11T00:00:00,37.60228,4.0,
,Outage,Undefined,,,,Dry well (groundwater),,07/31/2015,,,Household,Inland,-120.910673,,942,Oakdale,,Stanislaus,2015-07-31T00:00:00,37.73886,66.0,
65',Outage,Undefined,,65',,Dry well (groundwater),,09/11/2015,08/15/2015,,Household,Inland,-121.033522,65' on 8/15/15,3109,Newman,,Stanislaus,2015-09-11T00:00:00,37.317443,5.0,
UNK,Outage,Undefined,,UNK,,Dry well (groundwater),,09/11/2015,08/20/2015,,Household,Inland,-120.755139,UNK,3110,Waterford,,Stanislaus,2015-09-11T00:00:00,37.6603629,6.0,
,Outage,Undefined,,,,Dry well (groundwater),,09/11/2015,09/10/2015,,Household,Inland,-120.899061,,3111,Modesto,,Stanislaus,2015-09-11T00:00:00,37.678451,7.0,
,Outage,Undefined,,,,Dry well (groundwater),,09/11/2015,09/08/2015,,Household,Inland,-120.9008227,,3112,Empire,,Stanislaus,2015-09-11T00:00:00,37.638741,8.0,
"Original well - 355 ft, new well - 540 ft",Resolved,Resolved,owner-occupied,75%,"Reduction in water pressure, lower flows.",,,,,,,,,,,,,,,,,


In [0]:
# dbutils.fs.ls(f"dbfs:/FileStore/shortage.csv")
# dbutils.fs.rm(f"dbfs:/FileStore/WaterWell/shortage.csv", True)

#dbutils.fs.ls(f"dbfs:/FileStore/shortage.csv")
#dbutils.fs.rm(f"dbfs:/FileStore/WaterWell/wellcompletion.csv", True)
dbutils.fs.rm(f"dbfs:/FileStore/WaterWell/groundwater.csv", True)

In [0]:
#pandas_df = pd.read_csv('/dbfs/FileStore/shortage.csv', header='infer') 

### Check if you can read back the saved csv

In [0]:
pd_df_shortage.columns

In [0]:
#NOte https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/3137082781873852/3704545280501166/1264763342038607/latest.html
#In Databricks this global context object is available as sc for this purpose

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
sqlContext 

In [0]:
new_column_names = [col_name.replace(" ", "_") for col_name in df_shortage.columns ]
df_shortage =  df_shortage.toDF(*new_column_names)


In [0]:
#display(df_shortage)

Well_Depth,StatusType,Status,Household_Support,Pump_Rate_Reduction,Water_Issues,Shortage_Type,Was_Issue_Resolved?,CREATE_DATE,Approximate_Issue_Start_Date,Measure_Date,Primary_Usages,Region,LONGITUDE,Well_to_Water_Depth,ID,CITY,Approximate_Repair_Cost,County,Report_Date,LATITUDE,_id,Additional_Info
45',Outage,Undefined,,>45',,Dry well (groundwater),,09/11/2015,07/07/2015,,Household,Inland,-120.884807,"May, 2015",3102,Empire,,Stanislaus,2015-09-11T00:00:00,37.657762,1,
47',Outage,Undefined,,44',,Dry well (groundwater),,09/11/2015,07/27/2015,,Household,Inland,-120.71052,Sept. 2014,3103,Hickman,,Stanislaus,2015-09-11T00:00:00,37.6310712,2,
85',Outage,Undefined,,>85',,Dry well (groundwater),,09/11/2015,08/01/2015,,Household,Inland,-120.816245,42220,3105,Denair,,Stanislaus,2015-09-11T00:00:00,37.578196,3,
110',Outage,Undefined,,UNK,,Dry well (groundwater),,09/11/2015,08/01/2015,,Household,Inland,-120.960309,UNK,3108,Ceres,,Stanislaus,2015-09-11T00:00:00,37.60228,4,
,Outage,Undefined,,,,Dry well (groundwater),,07/31/2015,,,Household,Inland,-120.910673,,942,Oakdale,,Stanislaus,2015-07-31T00:00:00,37.73886,66,
65',Outage,Undefined,,65',,Dry well (groundwater),,09/11/2015,08/15/2015,,Household,Inland,-121.033522,65' on 8/15/15,3109,Newman,,Stanislaus,2015-09-11T00:00:00,37.317443,5,
UNK,Outage,Undefined,,UNK,,Dry well (groundwater),,09/11/2015,08/20/2015,,Household,Inland,-120.755139,UNK,3110,Waterford,,Stanislaus,2015-09-11T00:00:00,37.6603629,6,
,Outage,Undefined,,,,Dry well (groundwater),,09/11/2015,09/10/2015,,Household,Inland,-120.899061,,3111,Modesto,,Stanislaus,2015-09-11T00:00:00,37.678451,7,
,Outage,Undefined,,,,Dry well (groundwater),,09/11/2015,09/08/2015,,Household,Inland,-120.9008227,,3112,Empire,,Stanislaus,2015-09-11T00:00:00,37.638741,8,
"Original well - 355 ft, new well - 540 ft",Resolved,Resolved,owner-occupied,75%,"Reduction in water pressure, lower flows. ,Reduction in water quality ,Well is catching air, have to wait to be able to pump.","New well drilled with poor water quality, water level still falling","Yes, drilled replacement well. ,Poor water quality",09/19/2015,01/01/2013,09/19/2015,Combination of Household/Agriculture,Southern,-120.621761,280 ft - continuous measurement with Wellntel,3121,Templeton,"$25,000",San Luis Obispo,2015-09-19T00:00:00,35.544734,9,Many of our neighbors have drilled new wells into poor quality water or are trucking water for their homes.


In [0]:
perm_table_name = "table_shortage"
df_shortage.write.format("parquet").saveAsTable(f"MILESTONE2WATER.{perm_table_name}")  

In [0]:
%sql

SHOW TABLES IN milestone2water


database,tableName,isTemporary
milestone2water,table_completion,False
milestone2water,table_shortage,False


In [0]:
%sql
select * from milestone2water.table_shortage limit 3

Well_Depth,StatusType,Status,Household_Support,Pump_Rate_Reduction,Water_Issues,Shortage_Type,Was_Issue_Resolved?,CREATE_DATE,Approximate_Issue_Start_Date,Measure_Date,Primary_Usages,Region,LONGITUDE,Well_to_Water_Depth,ID,CITY,Approximate_Repair_Cost,County,Report_Date,LATITUDE,_id,Additional_Info
,Outage,Undefined,,,Well is dry (no longer producing water),Dry well (groundwater),,05/24/2021,04/06/2021,,Household,Southern,-120.61821,,10230,TEMPLETON,,San Luis Obispo,2021-05-24T00:00:00,35.50495,2830,
166.0,Resolved,Interim Solution,owner-occupied,"Yes, about a 30-40% reduction","Reduction in water pressure, lower flows.",Dry well (groundwater),"No, trucking in water",05/25/2021,05/04/2021,05/18/2021,Household,Inland,-119.78637,148.0,10247,Lemoore,,Kings,2021-05-25T00:00:00,36.30101,2831,
200.0,Resolved,Interim Solution,owner-occupied,It's no longer producing any water,Well is dry (no longer producing water),Dry well (groundwater),"No, trucking in water",05/25/2021,05/14/2021,05/20/2021,Household,Inland,-119.995,200.0,10248,Madera,,Madera,2021-05-25T00:00:00,37.05132,2832,


In [0]:
perm_table_name = "table_completion"
df_wellcompletion.write.format("parquet").saveAsTable(f"MILESTONE2WATER.{perm_table_name}")  

In [0]:
%sql
show tables in MILESTONE2WATER

database,tableName,isTemporary
milestone2water,table_completion,False
milestone2water,table_shortage,False


In [0]:
%sql
SELECT COUNT(1) FROM MILESTONE2WATER.table_completion

count(1)
1043032


In [0]:
%sql
SELECT * FROM MILESTONE2WATER.table_completion LIMIT 1

DECIMALLATITUDE,WORKFLOWSTATUS,LLACCURACY,PERMITDATE,PUMPTESTLENGTH,SECTION,REGIONOFFICE,DRILLINGMETHOD,OTHEROBSERVATIONS,WELLYIELDUNITOFMEASURE,WELLLOCATION,PERMITNUMBER,TOTALDRAWDOWN,VERTICALDATUM,BOTTOMOFPERFORATEDINTERVAL,GROUNDSURFACEELEVATION,STATICWATERLEVEL,RECORDTYPE,DRILLERLICENSENUMBER,FLUID,TESTTYPE,APN,PLANNEDUSEFORMERUSE,LOCALPERMITAGENCY,TOPOFPERFORATEDINTERVAL,WCRNUMBER,TOTALDRILLDEPTH,LEGACYLOGNUMBER,ELEVATIONDETERMINATIONMETHOD,ELEVATIONACCURACY,DECIMALLONGITUDE,METHODOFDETERMINATIONLL,HORIZONTALDATUM,TOWNSHIP,DATEWORKENDED,CITY,TOTALCOMPLETEDDEPTH,OWNERASSIGNEDWELLNUMBER,COUNTYNAME,RANGE,BASELINEMERIDIAN,RECEIVEDDATE,DRILLERNAME,WELLYIELD,_id,CASINGDIAMETER
34.12784,,Centroid of Section,,,36,DWR Southern Region Office,Reverse Circulation,,GPM,EL CAMPO DR,,,,1080,,,WellCompletion/New/Production or Monitoring/NA,510011,Not Available at Conversion,,,Water Supply Public,"LA County Department of Public Health, Department of Health Services, Drinking Water Program",610,WCR1776-006426,,294182,,,-118.09132,Derived from TRS,,01N,,Pasadena,1100,,Los Angeles,12W,San Bernardino,,McCalla Brothers Division McCalla Brothers Division of Layne Western,2270,260064,28


In [0]:
%sql
--DESCRIBE MILESTONE2WATER.table_completion; --WCRNUMBER
DESCRIBE MILESTONE2WATER.table_completion;--WCRNUMBER

col_name,data_type,comment
DECIMALLATITUDE,string,
WORKFLOWSTATUS,string,
LLACCURACY,string,
PERMITDATE,string,
PUMPTESTLENGTH,string,
SECTION,string,
REGIONOFFICE,string,
DRILLINGMETHOD,string,
OTHEROBSERVATIONS,string,
WELLYIELDUNITOFMEASURE,string,


## Conversion of spark dataframe to pandas is speeded up by the below (8 sec in this case)

In [0]:
# spark.conf.set("spark.sql.execution.arrow.enabled", "true")

# pd_df_wellcompletion = df_wellcompletion.toPandas()

In [0]:
#display(pd_df_wellcompletion)

DECIMALLATITUDE,WORKFLOWSTATUS,LLACCURACY,PERMITDATE,PUMPTESTLENGTH,SECTION,REGIONOFFICE,DRILLINGMETHOD,OTHEROBSERVATIONS,WELLYIELDUNITOFMEASURE,WELLLOCATION,PERMITNUMBER,TOTALDRAWDOWN,VERTICALDATUM,BOTTOMOFPERFORATEDINTERVAL,GROUNDSURFACEELEVATION,STATICWATERLEVEL,RECORDTYPE,DRILLERLICENSENUMBER,FLUID,TESTTYPE,APN,PLANNEDUSEFORMERUSE,LOCALPERMITAGENCY,TOPOFPERFORATEDINTERVAL,WCRNUMBER,TOTALDRILLDEPTH,LEGACYLOGNUMBER,ELEVATIONDETERMINATIONMETHOD,ELEVATIONACCURACY,DECIMALLONGITUDE,METHODOFDETERMINATIONLL,HORIZONTALDATUM,TOWNSHIP,DATEWORKENDED,CITY,TOTALCOMPLETEDDEPTH,OWNERASSIGNEDWELLNUMBER,COUNTYNAME,RANGE,BASELINEMERIDIAN,RECEIVEDDATE,DRILLERNAME,WELLYIELD,_id,CASINGDIAMETER
,,,,,NO NUMBER,,,,,,,,,,,,WellCompletion/New/Production or Monitoring/NA,,,,,,,,WCR0097017,,095043,,,,,,,,,,,,,,,,,1042952,
37.532473,,>50 FT,,,03,DWR North Central Region Office,,,,41100 ROBERTS AVENUE,2015-0443,,,,,,WellCompletion/Destruction/NA/NA,938110,,,525600185,,Alameda County Water District,,WCR2015-006846,,E0289784,,,-121.956264,Derived from Address,,05S,10/29/2015,FREMONT,55.0,MW-10,Alameda,01W,Mount Diablo,,CASCADE DRILLING L P,,1,
37.52371,,Unknown,,,02,DWR North Central Region Office,SONIC,,,8333 ENTERPRISE DR.,2014-0063,,,18.0,,,WellCompletion/New/Production or Monitoring/NA,953646,,,921406,Injection,Alameda County Water District,13.0,WCR2014-015309,,E0205117,,,-122.048878,Unknown,,05S,3/5/2014,NEWARK,18.1,INJ-B11,Alameda,02W,Mount Diablo,,NATIONAL E W P INC,,2,2.375
37.696685,,Unknown,,,01,DWR North Central Region Office,,,,7200 JOHNSON DRIVE,2015080,,,,,,WellCompletion/Destruction/NA/NA,695970,,,941131119-1,Not Specified,Zone 7 Water Agency - Alameda County Flood Control and Water Conservation District,,WCR2015-010054,,E0275188,,,-121.917426,Unknown,,03S,7/15/2015,PLEASANTON,20.0,UST-1,Alameda,01W,Mount Diablo,,ENVIRONMENTAL CONTROL ASSOCIATES,,3,
37.4315,Completeness Review - Auto-Complete - 06-JAN-20,,6/21/2019,4.0,09,DWR North Central Region Office,Direct Rotary,,GPM,22148 Eden Canyon RD,w2019-0491,,,840.0,,600.0,WellCompletion/New/Production or Monitoring/NA,510952,Bentonite,Air Lift,,Water Supply Domestic,"Alameda County Public Works Agency, Water Resources Section",600.0,WCR2019-015599,840.0,,,,-122.08,,WGS84,06S,10/22/2019,Castro Valley,840.0,2,Alameda,02W,Mount Diablo,11/1/2019,MARTELL WATER SYSTEMS INC,1.0,4,
37.782289,Completeness Review - Auto-Complete - 12-NOV-18,10 Ft,7/13/2018,,06,DWR North Central Region Office,Other Hollow-stem Auger,,,2301 E 23rd AVE,W2018-0633,,NAVD88,20.0,21.4,,WellCompletion/New/Production or Monitoring/NA,484288,Bentonite,,19-102-1-1,Monitoring,"Alameda County Public Works Agency, Water Resources Section",5.0,WCR2018-007848,20.0,,Surveyed,0.1 Ft,-122.2368998,Other,WGS84,02S,7/18/2018,Oakland,20.0,EW-1A,Alameda,03W,Mount Diablo,9/7/2018,EXPLORATION GEOSERVICES INC,,5,
37.57119,,Centroid of Section,,,21,DWR North Central Region Office,,,,,,,,,,,WellCompletion/New/Production or Monitoring/NA,,,,,,Alameda County Water District,,WCR0120609,,ACWD1792,,,-121.97812,Derived from TRS,,04S,,,,,Alameda,01W,Mount Diablo,,,,57,
37.6769888,Completeness Review - Auto-Complete - 05-AUG-19,0.1 Ft,4/29/2019,,13,DWR North Central Region Office,,,,1436 Grant AVE,W2019-0308,,NAVD88,,18.54,6.85,WellCompletion/Destruction/NA/NA,1046151,,,411-039-001,Remediation,"Alameda County Public Works Agency, Water Resources Section",,WCR2019-007591,,,GPS,,-122.1428659,GPS,NAD83,03S,5/30/2019,San Lorenzo,20.0,EX-1,Alameda,03W,Mount Diablo,6/4/2019,SLAGLE DRILLING CORP,,6,
37.494256,Completeness Review - Auto-Complete - 03-MAY-21,,2/3/2021,,14,DWR North Central Region Office,Auger,,,45500 Fremont BLVD,2021-0039,,,21.0,,10.16,WellCompletion/New/Production or Monitoring/NA,1058336,,,519-1747-11-1,Injection,Alameda County Water District,11.0,WCR2021-002485,21.0,,,,-121.945441,,WGS84,05S,2/12/2021,Fremont,21.0,WA-IF,Alameda,01W,Mount Diablo,2/25/2021,CASCADE DRILLING LP,,7,
37.49857,,Centroid of Section,,,16,DWR North Central Region Office,Auger,,,6453 AUTO MALL PKWY,,,,10.0,,,WellCompletion/New/Production or Monitoring/NA,283326,Not Available at Conversion,,531-185-7,Monitoring,Alameda County Water District,5.0,WCR2008-003926,,e0083648,,,-121.97837,Derived from TRS,,05S,11/21/2008,FREMONT,10.0,MW 5,Alameda,01W,Mount Diablo,,WDC EXPLORATION & WELLS WDC EXPLORATION & WELLS,,8,2.0


In [0]:
pd_df_shortage = df_shortage.toPandas()

In [0]:
# create pandas dataframe
pd_df_station = pd.DataFrame(data=data_stations)
df_station=spark.createDataFrame(pd_df_station)

perm_table_name = "table_station"
df_station.write.format("parquet").saveAsTable(f"MILESTONE2WATER.{perm_table_name}") 


In [0]:
# create pandas dataframe from JSON DATA
pd_df_groundwater = pd.DataFrame(data=data_groundwater)
df_groundwater=spark.createDataFrame(pd_df_groundwater)

perm_table_name = "table_groundwater"
df_groundwater.write.format("parquet").saveAsTable(f"MILESTONE2WATER.{perm_table_name}") 

In [0]:
%sql 
show tables in MILESTONE2WATER;

database,tableName,isTemporary
milestone2water,table_completion,False
milestone2water,table_groundwater,False
milestone2water,table_shortage,False
milestone2water,table_station,False


In [0]:
%sql 
select "Well Completion Count" as Table_Name, count(1) as Row_Count from milestone2water.table_completion
union
select "Ground Water Count" as Table_Name, count(1) as Row_Count from milestone2water.table_groundwater
union
select "Household Shortage Count" as Table_Name, count(1) as Row_Count from milestone2water.table_shortage
union
select "Station Count" as Table_Name, count(1) as Row_Count from milestone2water.table_station




Table_Name,Row_Count
Well Completion Count,1043032
Ground Water Count,2530751
Household Shortage Count,3774
Station Count,45922


In [0]:
%sql 
select  count(1) as row_count  from milestone2water.table_completion



row_count
1043032


In [0]:
df_station.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("dbfs:/FileStore/WaterWell/Station.csv")

In [0]:
#df_groundwater.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("dbfs:/FileStore/WaterWell/groundwater.csv")



In [0]:

dbutils.fs.ls(f"dbfs:/FileStore/WaterWell/groundwater.csv")

In [0]:
df_wellcompletion = spark.sql("SELECT * FROM  milestone2water.table_completion")

In [0]:
df_shortage = spark.sql("SELECT * FROM  milestone2water.table_shortage")

In [0]:
df_shortage.count()

- https://adb-8268979028189023.3.azuredatabricks.net/?o=826897902818902

- https://adb-8268979028189023.3.azuredatabricks.net/files/WaterWell/groundwater.csv/?o=8268979028189023



- https://adb-8268979028189023.3.azuredatabricks.net/files/WaterWell/groundwater.csv//part-00000-tid-755157886083895489-6b6aa279-6254-4e87-8d86-bdd5c4ae25df-55-1-c000.csv?o=8268979028189023

- https://adb-8268979028189023.3.azuredatabricks.net/files/WaterWell/Station.csv/part-00000-tid-8175063190170722800-757a8275-cd4b-40fb-a8d0-cd4e6697a20a-54-1-c000.csv?o=8268979028189023


- https://adb-8268979028189023.3.azuredatabricks.net/files/WaterWell/wellcompletion.csv/part-00000-tid-461276457967221884-5f4b19f1-0f0e-4a78-a13a-66ff90a9b027-66-1-c000.csv?o=8268979028189023

- https://adb-8268979028189023.3.azuredatabricks.net/files/WaterWell/shortage.csv/part-00000-tid-347414376517014295-70b4847f-5ffa-4c79-b389-736a993b950a-67-1-c000.csv?o=8268979028189023




#### Non-parquet files

- https://adb-8268979028189023.3.azuredatabricks.net/files/shortage.csv?o=8268979028189023
- https://adb-8268979028189023.3.azuredatabricks.net/files/wellcompletion.csv?o=8268979028189023
- https://adb-8268979028189023.3.azuredatabricks.net/files/groundwater.csv?o=8268979028189023

In [0]:
%sql

--select * from milestone2water.table_shortage

Well_Depth,StatusType,Status,Household_Support,Pump_Rate_Reduction,Water_Issues,Shortage_Type,Was_Issue_Resolved?,CREATE_DATE,Approximate_Issue_Start_Date,Measure_Date,Primary_Usages,Region,LONGITUDE,Well_to_Water_Depth,ID,CITY,Approximate_Repair_Cost,County,Report_Date,LATITUDE,_id,Additional_Info
,Outage,Undefined,,,Well is dry (no longer producing water),Dry well (groundwater),,05/24/2021,04/06/2021,,Household,Southern,-120.61821,,10230,TEMPLETON,,San Luis Obispo,2021-05-24T00:00:00,35.50495,2830,
166,Resolved,Interim Solution,owner-occupied,"Yes, about a 30-40% reduction","Reduction in water pressure, lower flows.",Dry well (groundwater),"No, trucking in water",05/25/2021,05/04/2021,05/18/2021,Household,Inland,-119.78637,148,10247,Lemoore,,Kings,2021-05-25T00:00:00,36.30101,2831,
200,Resolved,Interim Solution,owner-occupied,It's no longer producing any water,Well is dry (no longer producing water),Dry well (groundwater),"No, trucking in water",05/25/2021,05/14/2021,05/20/2021,Household,Inland,-119.995,200,10248,Madera,,Madera,2021-05-25T00:00:00,37.05132,2832,
,Resolved,Interim Solution,owner-occupied,It's no longer producing any water,Well is dry (no longer producing water),Dry well (groundwater),"No, trucking in water",05/26/2021,05/21/2021,05/25/2021,Household,Inland,-120.14357,0,10267,Madera,,Madera,2021-05-26T00:00:00,37.01731,2833,
200,Resolved,Interim Solution,owner-occupied,It's no longer producing any water,Well is dry (no longer producing water),Dry well (groundwater),"No, trucking in water",05/26/2021,05/24/2021,05/24/2021,Household,Inland,-120.328507,195,10268,Chowchilla,,Madera,2021-05-26T00:00:00,37.020235,2834,
117,Resolved,Resolved,owner-occupied,too low to get accurate reading,"Reduction in water pressure, lower flows. ,Well is catching air, have to wait to be able to pump.",Dry well (groundwater),"Yes, lowered the pump bowl ,No, cannot afford to finance solutions.",05/30/2021,08/15/2015,11/18/1977,Household,Inland,-122.18107,93 ft.,10287,corning,,Tehama,2021-05-30T00:00:00,39.94158,2835,"My wife and I are over 70 and enjoy gardening as a supplement to store bought we have one small garden (10 by 4) but would like to expand but with our water situation it's not possible. I am afraid one day the water level will drop below our pump and stay there. I don't know why our neighbor is not experiencing the same problems, different depth? different strata?"
53',Resolved,Interim Solution,owner-occupied,,Well is dry (no longer producing water),Dry well (groundwater),"No, trucking in water",05/31/2021,05/30/2021,,Household,Inland,-122.21164,,10307,Orland,,Glenn,2021-05-31T00:00:00,39.74636,2836,
75,Outage,Outage,owner-occupied,85% reduction,"Reduction in water pressure, lower flows. ,Reduction in water quality ,Well is catching air, have to wait to be able to pump. ,Well is pumping sand, muddy water.",Dry well (groundwater),"Yes, lowered the pump bowl ,Pump burned up and have one on order and waiting to have a service company install it ,No, cannot afford to finance solutions. ,No, getting water from our neighbors with a hose",05/31/2021,05/01/2021,,Household,Inland,-122.15781,55,10308,Orland,"Dropping pump $500, new pump $1000. To start",Glenn,2021-05-31T00:00:00,39.73126,2837,Everyone around us and neighbors are having the same problems and with our water table being so low we will have to drill the well deeper but the wait list in Orland and Glenn County is months out and we cannot afford that cost.
,Resolved,Interim Solution,renter-occupied,,Well is dry (no longer producing water),Dry well (groundwater),"No, trucking in water",06/01/2021,05/21/2021,,Household,Inland,-122.11097,,10309,Orland,,Glenn,2021-06-01T00:00:00,39.70626,2838,Family of 5 being evicted due to well running dry. No available housing for family of 5 nearby.
330,Outage,Outage,owner-occupied,Not sure,"Well is catching air, have to wait to be able to pump.",Dry well (groundwater),"No, cannot afford to finance solutions.",06/01/2021,03/01/2021,05/10/2021,Household,Inland,-122.33318,260,10310,Corning,"I?ve spent 3,000 on pump and lowering Not fixed!",Tehama,2021-06-01T00:00:00,39.93327,2839,


In [0]:

spark_df_shortage = spark.sql("""select * from milestone2water.table_shortage """)

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=b042e2da-6536-449d-95b8-d85fa08825de' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>