In [1]:
!pip install more_pyspark



In [2]:
!pip install composable --upgrade

Collecting composable
  Downloading composable-0.5.4-py3-none-any.whl (8.5 kB)
Installing collected packages: composable
  Attempting uninstall: composable
    Found existing installation: composable 0.5.3
    Uninstalling composable-0.5.3:
      Successfully uninstalled composable-0.5.3
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
more-dfply 0.2.10 requires composable<0.3.0,>=0.2.5, but you have composable 0.5.4 which is incompatible.[0m
Successfully installed composable-0.5.4


# Lab 4 - Creating partitioned parquet files

In this lab, we will perform our first round of data preparation by writing the larger files (`XREF` and the yearly `parcel` files to the `parquet` format.  

In the process, we will discuss and investigate an important concept in managing lots of data: the principle of locality.  Big data problem as IO bound, meaning that almost all of the time/resources will be used managing the input/output of data.  The principle of locality holds that is often reused over a short period of time (temporal locality) and data that is stored in similar locations tend to be used at similar points in a program (spatial locality).  The `parquet` always us to partition a data set to leverage these properties.   *The correct partitioning can result in orders of magnitude speed up in processing time!*



## The Principle of Locality.

<img src="./img/locality.png" width="800">

We can leverage the behavior of the operating system (OS)--in particular the loading of chucks of data in proximity and keeping that data in memory for a time--by partitioning our data so that similar data is stored together.

## We need to group the data by lake and distance to the lake

<img src="./img/row_proximity.png" width="800">

## Problem 1 - Understanding the big picture and tables keys

**Tasks.**  

1. Explain why it, in the case of parcel data, to group the rows by lake id and distance to the lake.
2. Neither of these columns is present in the parcel data files.  How will we go about adding this information?

> <font color="orange"> we can take the lake id and distance from xref we need to group by lake id to get data on a lake by lake basis </font>

## Problem 2 - Writing the XREF to a partitioned parquet "file"

**Tasks.**

1. Load the `XREF` data and select the relevant columns (Lake ID, centroid lat & long, distance to the lake).
2. Create a new categorical variable named with three categories based on distance to the lake: withing 500m, between 501-1600m, and over 1600m.
3. Write the table 
2. Read in each of these files and suggest the columns that will be used to join the tables.
3. To understand the relationship (one-to-one; one-to-many; many-to-many) between tables, perform aggregation on each table to determine if there is one or many keys per row.
4. Based on the results of the last task, suggest a join type and justify your response.
5. For each table, create query that results in a column with one unique key per row.
6. Perform the join suggested in **4.** and investigate any mismatches.  Document your findings and suggest necessary remedies.

**Note.** The code for partitioning and writing a parquet file for the water quality data is provided as an example. 

#### Example - Writing a partitioned water quality file

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col
from more_pyspark import to_pandas
spark = (SparkSession.builder.appName('Ops')
         .getOrCreate())

your 131072x1 screen size is bogus. expect trouble


22/12/03 10:59:30 WARN Utils: Your hostname, lu4543hm221 resolves to a loopback address: 127.0.1.1; using 172.21.162.18 instead (on interface eth0)
22/12/03 10:59:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/03 10:59:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [17]:
xref_file = spark.read.csv('./data/MinneMUDAC_raw_files/MinneMUDAC_raw_files/Parcel_Lake_Monitoring_Site_Xref.txt',
                    header=True,
                    sep='\t'
)

xref_file.take(2)>>to_pandas

Unnamed: 0,Parcel_PIN,Monit_MAP_CODE1,Monit_SITE_CODE,Monit_LAKE_SITE,Distance_Parcel_Monitoring_Site_meters,Lake_Hydroid,Distance_Parcel_Lake_meters,centroid_long,centroid_lat,Parcel_pkey
0,,19007900-01,19007900,1,2815.4927104148846,110517277058,2571.526792225838,-93.11451,44.94283,2163034
1,,19007900-01,19007900,1,2753.474687531216,110517277058,2515.3738022144425,-93.11539,44.94234,2163035


In [18]:
xref_selected = (xref_file
                .select('Monit_MAP_CODE1', 'centroid_lat', 'centroid_long', 'Distance_Parcel_Monitoring_Site_meters', 'Distance_Parcel_Lake_meters')
                .withColumn('distance', when(col('Distance_Parcel_Lake_meters') < 500, 'Less Than 500')
                                        .when(col('Distance_Parcel_Lake_meters') > 1600, "Over 1600")
                                        .when(col('Distance_Parcel_Lake_meters') <= 1600, "Between 500 and 1600")
                                        .otherwise('Unknown Distance'))
)


(xref_selected.where(col('distance') == 'Less Than 500')).take(10) >> to_pandas

Unnamed: 0,Monit_MAP_CODE1,centroid_lat,centroid_long,Distance_Parcel_Monitoring_Site_meters,Distance_Parcel_Lake_meters,distance
0,27010700-01,44.98875,-93.4634,722.8180666427731,498.0579509656829,Less Than 500
1,27010700-01,44.98903,-93.46419,653.5952330774182,428.4444162456371,Less Than 500
2,27010700-01,44.98903,-93.46458,628.0660359863293,400.6929258452405,Less Than 500
3,27010700-01,44.98902,-93.46496,604.3187015484795,374.1914771957898,Less Than 500
4,27010700-01,44.98787,-93.46512,678.9834948807496,433.7585084081055,Less Than 500
5,27010700-01,44.9876,-93.46511,701.3156946349593,454.6024014556449,Less Than 500
6,27010700-01,44.98729,-93.46509,728.0777188180051,480.1955101268163,Less Than 500
7,27010700-01,44.98787,-93.46434,723.2651110735995,481.83398431552473,Less Than 500
8,27010700-01,44.98815,-93.46435,702.1247815817089,463.45048693718456,Less Than 500
9,27010700-01,44.98842,-93.46436,682.4421586389875,447.03773991223926,Less Than 500


In [21]:
##FILE ALREADY RAN - NO NEED TO RUN AGAIN PARQUET EXISTS COMMENTED OUT

# %%timeit -n 1 -r 1

# (xref_selected
#  .write
#  .partitionBy('Monit_MAP_CODE1')
#  .mode('overwrite')
#  .parquet('xref_selected.parquet')
# )

                                                                                

14.4 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [19]:
from more_pyspark import to_pandas

water_quality = spark.read.csv('./data/MinneMUDAC_raw_files/MinneMUDAC_raw_files/mces_lakes_1999_2014.txt',
                              header = True,
                              sep='\t')
water_quality.take(2) >> to_pandas

22/12/01 11:52:52 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Unnamed: 0,PROJECT_ID,DATA_SET_TITLE,LAKE_NAME,CITY,COUNTY,DNR_ID_Site_Number,MAJOR_WATERSHED,WATER_PLANNING_AUTHORITY,LAKE_SITE_NUMBER,START_DATE,...,Secchi_Depth_RESULT_SIGN,Secchi_Depth_RESULT,Secchi_Depth_QUALIFIER,Secchi_Depth_Units,Total_Phosphorus_RESULT_SIGN,Total_Phosphorus_RESULT,Total_Phosphorus_QUALIFIER,Total_Phosphorus_Units,longitude,latitude
0,7108,Citizen Assisted Monitoring Program (CAMP) for...,Acorn Lake,Oakdale,Washington,82010200-01,Lower St. Croix River,Valley Branch WD,1,2006-04-16,...,,1.0,Approved,m,,0.156,Approved,mg/L,-92.97171054,45.01655642
1,7108,Citizen Assisted Monitoring Program (CAMP) for...,Acorn Lake,Oakdale,Washington,82010200-01,Lower St. Croix River,Valley Branch WD,1,2006-05-01,...,,,,m,,,,mg/L,-92.97171054,45.01655642


In [9]:
## SIMILAR TO ABOVE PARQUET ALREADY EXISTS SO COMMENTED OUT LINE 

# %%timeit -n 1 -r 1

# (water_quality
#  .write
#  .partitionBy('DNR_ID_Site_Number')
#  .mode('overwrite')
#  .parquet('water_quality.parquet')
# )

[Stage 2:>                                                          (0 + 3) / 3]

5.74 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


                                                                                

In [20]:
waterparq = spark.read.parquet('water_quality.parquet')
xref_parq = spark.read.parquet('xref_selected.parquet/')

                                                                                

In [21]:
xref_parq.count()

                                                                                

2688766

In [23]:
xref_parq

DataFrame[centroid_lat: string, centroid_long: string, Distance_Parcel_Monitoring_Site_meters: string, Distance_Parcel_Lake_meters: string, distance: string, Monit_MAP_CODE1: string]

In [30]:
xref_distinct = xref_parq.drop_duplicates(['Monit_MAP_CODE1'])
xref_distinct

DataFrame[centroid_lat: string, centroid_long: string, Distance_Parcel_Monitoring_Site_meters: string, Distance_Parcel_Lake_meters: string, distance: string, Monit_MAP_CODE1: string]

### Part 3 - Inspecting the partitioned `parquet` file

**Tasks.** Inspect the resulting "file" (actually a folder) from the last set and answer the following questions.

1. What impact did the partitioning have on the way the data was saved?
2. How would this structure help `pyspark` apply predicate pushdown?
3. How would this structure provide help via the principle of locality.  
4. When working with a cluster of machines, operations such as `groupby` are WIDE operations, meaning they generally need to shuffle data between machines.  Such a suffle is *very* expensive.  In a future lab, we will be creating features for each labke by grouping and aggregating on the lakes and years.  How would applying a similar structure to the parcel data help in this case? **Hint.** Remember that the data will be distributed across multiple machines using the partitions, i.e., each machine will load all or some of the same partition(s).


> <font color="orange"> The data is being partionied similarly by xref monit map sites and dnr sites for water quality data - it contains multiple formats of data within them . I might be wrong with the pyspark predicate but being able to use the groupings specifically to combine and grab the water quality information on each site should be easy through pysparks dotchaining </font>

## <font color="blue"> Key </font>

> <font color="orange"> <b>1.</b> The "file" is actually a directory with sub-folders for each combo of labels for the partitioning variables.  <b>2.</b> <code>pyspark</code> can use the directory structure to totally combination that we filter out. <b>3.</b> Having the data partitioned/sorted should also help with the principle of locality by keeping similar data close and thus in memory at the same time.  When spreading our data across multiple machines, this will be particularly advantagous as each meaning can just load some/all of a partition, saving us having to spread data across multiple machines. </font>

### Part 4 - Filter parcels and joining lake id

Next, we will partition and write each of the 2004-2015 parcel files to a `parquet` "file".  To do this, complete each of the following tasks.

**Tasks.**

1. Write a helper functions that takes a parcel file path as input, reads corresponding CSV, selects the common columns (import from `parcel.py`), and joins on the necessary info from the `XREF` (lake ID, distance to the lake, distance category defined above, and centroid lat & long).
2. Write a helper function that takes a `year` and parcel `df`, partitions the file by the lake ID and distance category, and writes the data to a "file" names `parcel_year.parquet`.
3. Test the two helper functions on one of the parcel file years to make sure they are bug free.
4. Write a pipe with a familiar shape
    * Use `glob` to get all parcel file paths
    * Filter the paths to 2004-2015
    * split into year/df tuples using `get_year` and your helper function from **1.**.
    * star_map your helper function from **2.** to write each of the files.
    
**Important note.** Each parcel files took 10+ minutes on my laptop, so running the whole pipe will take a while.  Pick a convenient time and be sure to plug in your laptop!

In [67]:
from composable.glob import glob
from composable.strict import map, star_map, filter, sorted
from composable.sequence import reduce
from composable import pipeable
from pyspark.sql.functions import lit
import pandas as pd
from composable.tuple import split_by

In [31]:
# Place your code/thoughts in one or more code/markdown cells, respectively.

# parcel1 = sorted(glob('./data/MinneMUDAC_raw_files/MinneMUDAC_raw_files/200[456789]*parcel*.txt'))
# parcel2 = sorted(glob('./data/MinneMUDAC_raw_files/MinneMUDAC_raw_files/201*parcel*.txt'))
# parcel_files = (parcel1+parcel2)
# from parcel import sort_common_cols_2004_to_2015
# from pyspark.sql.functions import col
# read_parcel = lambda path: spark.read.csv(path, header=True, sep='|').select(sort_common_cols_2004_t0_2015)
# union_files = lambda out_df, df: out_df.distinct().union().distinct()

# xref = (xref_distinct
#     .select('centroid_long', 'centroid_lat', 'distance', 'MONIT_MAP_CODE1'))


In [49]:
parcel_files1 = sorted(glob('./data/MinneMUDAC_raw_files/MinneMUDAC_raw_files/200[456789]*parcel*.txt'))
parcel_files2 = sorted(glob('./data/MinneMUDAC_raw_files/MinneMUDAC_raw_files/201*parcel*.txt'))
from parcel import sort_common_cols_2004_to_2015
from pyspark.sql.functions import col
parcel_files = (parcel_files1+parcel_files2)
parcel_files

['./data/MinneMUDAC_raw_files/MinneMUDAC_raw_files/2004_metro_tax_parcels.txt',
 './data/MinneMUDAC_raw_files/MinneMUDAC_raw_files/2005_metro_tax_parcels.txt',
 './data/MinneMUDAC_raw_files/MinneMUDAC_raw_files/2006_metro_tax_parcels.txt',
 './data/MinneMUDAC_raw_files/MinneMUDAC_raw_files/2007_metro_tax_parcels.txt',
 './data/MinneMUDAC_raw_files/MinneMUDAC_raw_files/2008_metro_tax_parcels.txt',
 './data/MinneMUDAC_raw_files/MinneMUDAC_raw_files/2009_metro_tax_parcels.txt',
 './data/MinneMUDAC_raw_files/MinneMUDAC_raw_files/2010_metro_tax_parcels.txt',
 './data/MinneMUDAC_raw_files/MinneMUDAC_raw_files/2011_metro_tax_parcels.txt',
 './data/MinneMUDAC_raw_files/MinneMUDAC_raw_files/2012_metro_tax_parcels.txt',
 './data/MinneMUDAC_raw_files/MinneMUDAC_raw_files/2013_metro_tax_parcels.txt',
 './data/MinneMUDAC_raw_files/MinneMUDAC_raw_files/2014_metro_tax_parcels.txt',
 './data/MinneMUDAC_raw_files/MinneMUDAC_raw_files/2015_metro_tax_parcels.txt']

In [52]:
from parcel import sort_common_cols_2004_to_2015
read_parcel = lambda path: spark.read.csv(path, header=True, sep='|').select(sort_common_cols_2004_to_2015).join(xref_parq, on=['centroid_lat', 'centroid_long'], how='inner')

In [63]:
import re
compile_yr = re.compile('./data/MinneMUDAC_raw_files/MinneMUDAC_raw_files/(\d{4})_metro_tax_parcels.txt')
get_year = lambda path: compile_yr.search(path).group(1)

In [79]:
joined_parcels = (parcel_files
                    >>map(split_by((read_parcel, get_year)))
                    )
joined_parcels[1]

(DataFrame[centroid_lat: string, centroid_long: string, ACRES_DEED: string, ACRES_POLY: string, AGPRE_ENRD: string, AGPRE_EXPD: string, AG_PRESERV: string, BASEMENT: string, BLDG_NUM: string, BLOCK: string, CITY: string, CITY_USPS: string, COOLING: string, COUNTY_ID: string, DWELL_TYPE: string, EMV_BLDG: string, EMV_LAND: string, EMV_TOTAL: string, FIN_SQ_FT: string, GARAGESQFT: string, GREEN_ACRE: string, HEATING: string, HOME_STYLE: string, LANDMARK: string, LOT: string, MULTI_USES: string, NUM_UNITS: string, OPEN_SPACE: string, OWNER_MORE: string, OWNER_NAME: string, OWN_ADD_L1: string, OWN_ADD_L2: string, OWN_ADD_L3: string, PARC_CODE: string, PIN: string, PLAT_NAME: string, PREFIXTYPE: string, PREFIX_DIR: string, SALE_DATE: string, SALE_VALUE: string, SCHOOL_DST: string, SPEC_ASSES: string, STREETNAME: string, STREETTYPE: string, SUFFIX_DIR: string, Shape_Area: string, Shape_Leng: string, TAX_ADD_L1: string, TAX_ADD_L2: string, TAX_ADD_L3: string, TAX_CAPAC: string, TAX_EXEMPT: st

In [80]:
# %%timeit -n 1 -r 1
to_parquet = lambda df, year:df.write.partitionBy('Monit_MAP_CODE1', 'distance').mode('overwrite').parquet(f'parcel_{year}.parquet')


NO NEED TO RUN CELL BELOW - ALREADY RAN AND THE PARCEL PARQUET FILES WERE CREATED IN 30 MINUTES - DO *NOT* RUN CELL BELOW WIL

In [81]:
# (joined_parcels
#     >>star_map(to_parquet)
#     )

[Stage 91:>                                                        (0 + 8) / 15]

22/12/01 15:54:14 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
22/12/01 15:54:14 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


[Stage 96:>                                                        (0 + 8) / 15]

22/12/01 15:56:59 WARN TaskMemoryManager: Failed to allocate a page (8388608 bytes), try again.
22/12/01 15:57:00 WARN TaskMemoryManager: Failed to allocate a page (8388608 bytes), try again.


[Stage 111:>                                                       (0 + 8) / 16]

22/12/01 16:05:44 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers




22/12/01 16:06:25 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers




22/12/01 16:08:47 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


[Stage 136:>                                                       (0 + 8) / 17]

22/12/01 16:16:26 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers




22/12/01 16:17:02 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

[None, None, None, None, None, None, None, None, None, None, None, None]