# Project: Apache Spark

Objective:
 - to see if a farmers market is more likely to be in rich zip code
 - is to familiarize myself with Apache Spark
 - to use a Random Forest Model and optimize it
 - perform hyperparameter tuning 

In [0]:
# read data
taxes_2013 = (spark.read
  .option("header", "true")
  .csv("dbfs:/databricks-datasets/data.gov/irs_zip_code_data/data-001/2013_soi_zipcode_agi.csv"))

markets = (spark.read
  .option("header", "true")
  .csv("dbfs:/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/market_data.csv"))

In [0]:
# register dataframes as SparkSQL Tables
taxes_2013.createOrReplaceTempView("taxes2013")

markets.createOrReplaceTempView("markets")

list tables in workspace using SQL commands:

In [0]:
# show the dataframes
# equivalent to %sql select * from taxes2013
display(taxes_2013)

STATEFIPS,STATE,zipcode,agi_stub,N1,MARS1,MARS2,MARS4,PREP,N2,NUMDEP,A00100,N02650,A02650,N00200,A00200,N00300,A00300,N00600,A00600,N00650,A00650,N00700,A00700,N00900,A00900,N01000,A01000,N01400,A01400,N01700,A01700,SCHF,N02300,A02300,N02500,A02500,N26270,A26270,N02900,A02900,N03220,A03220,N03300,A03300,N03270,A03270,N03150,A03150,N03210,A03210,N03230,A03230,N03240,A03240,N04470,A04470,A00101,N18425,A18425,N18450,A18450,N18500,A18500,N18300,A18300,N19300,A19300,N19700,A19700,N04800,A04800,N05800,A05800,N09600,A09600,N07100,A07100,N07300,A07300,N07180,A07180,N07230,A07230,N07240,A07240,N07220,A07220,N07260,A07260,N09400,A09400,N10600,A10600,N59660,A59660,N59720,A59720,N11070,A11070,N10960,A10960,N06500,A06500,N10300,A10300,N85330,A85330,N85300,A85300,N11901,A11901,N11902,A11902
1,AL,0,1,870380.0,488030.0,122290.0,247000.0,500770.0,1452580.0,571240.0,11255896.0,870380.0,11444868.0,700700.0,8889326.0,103290.0,77952.0,46870.0,75071.0,40890.0,47416.0,15650.0,6538.0,146240.0,824487.0,37970.0,23583.0,38400.0,221790.0,111060.0,1066291.0,8800.0,49720.0,187559.0,35370.0,62791.0,8980.0,10323.0,155190.0,186574.0,3300.0,747.0,140.0,487.0,8950.0,33584.0,2540.0,6212.0,20760.0,17533.0,5900.0,16956.0,30.0,2.0,57090.0,794815.0,884758.0,23440.0,22991.0,25940.0,20686.0,29140.0,25144.0,54970.0,84317.0,27140.0,151005.0,42500.0,111909.0,338560.0,1874627.0,333040.0,196535.0,0.0,0.0,108740.0,37220.0,2400.0,56.0,8850.0,2899.0,33010.0,16969.0,32080.0,5457.0,33530.0,10160.0,2650.0,677.0,117420.0,152943.0,807980.0,2277816.0,399710.0,1174659.0,371280.0,1057938.0,258200.0,327115.0,80350.0,76412.0,255750.0,159189.0,371450.0,318777.0,0.0,0.0,0.0,0.0,59580.0,44367.0,767170.0,2005593.0
1,AL,0,2,490960.0,195840.0,155230.0,125280.0,286130.0,1027850.0,383240.0,17632481.0,490960.0,17810952.0,427730.0,14501798.0,97400.0,81216.0,42440.0,92536.0,37100.0,61479.0,58830.0,29891.0,61400.0,252768.0,32710.0,54639.0,34860.0,320115.0,101580.0,1764445.0,8890.0,23590.0,84183.0,87130.0,542763.0,10690.0,49672.0,95190.0,177106.0,12310.0,2968.0,240.0,975.0,6870.0,31725.0,6350.0,20932.0,36070.0,36540.0,2360.0,5915.0,60.0,35.0,115520.0,1717953.0,4390768.0,83910.0,140609.0,25240.0,32077.0,77400.0,61241.0,113360.0,262971.0,72830.0,405101.0,94310.0,341396.0,470370.0,8602452.0,468160.0,1038975.0,0.0,0.0,217760.0,194669.0,6030.0,262.0,28850.0,16248.0,46050.0,47721.0,65150.0,12104.0,124900.0,110801.0,12820.0,4944.0,37210.0,77948.0,480870.0,2041535.0,132780.0,259313.0,112050.0,223466.0,104610.0,156274.0,37270.0,34254.0,367070.0,844185.0,386570.0,935430.0,0.0,0.0,0.0,0.0,65850.0,94281.0,418070.0,1192755.0
1,AL,0,3,258810.0,72710.0,146880.0,32860.0,157670.0,594910.0,189340.0,15916085.0,258810.0,16070153.0,223860.0,12289284.0,86850.0,80627.0,40800.0,118256.0,36250.0,81316.0,68480.0,42607.0,39840.0,259836.0,32420.0,84137.0,27970.0,348231.0,72490.0,1727323.0,7940.0,11640.0,45125.0,59580.0,849865.0,10940.0,94229.0,68820.0,151722.0,9550.0,2369.0,190.0,1100.0,4800.0,24606.0,5070.0,20023.0,26750.0,26173.0,2290.0,5165.0,110.0,171.0,105460.0,1797725.0,6537061.0,85350.0,216962.0,16720.0,25271.0,85670.0,78046.0,104930.0,347945.0,79910.0,498801.0,90350.0,396144.0,257690.0,10164831.0,257080.0,1401115.0,100.0,58.0,115430.0,170908.0,7860.0,542.0,18540.0,9741.0,25080.0,31702.0,13300.0,2074.0,77900.0,118242.0,10740.0,4167.0,26610.0,65373.0,255340.0,1749879.0,310.0,25.0,520.0,47.0,10470.0,13259.0,18980.0,17402.0,245810.0,1229923.0,249040.0,1310745.0,0.0,0.0,0.0,0.0,58810.0,125566.0,194360.0,552938.0
1,AL,0,4,163290.0,24860.0,126480.0,9790.0,98920.0,424160.0,134370.0,14161207.0,163290.0,14288572.0,143290.0,10773848.0,72130.0,71086.0,36370.0,120329.0,32710.0,84330.0,62060.0,44250.0,27380.0,214668.0,28960.0,105947.0,21820.0,357541.0,52290.0,1537085.0,6240.0,6550.0,25907.0,39400.0,752960.0,10280.0,117239.0,48480.0,123525.0,9130.0,2478.0,270.0,2241.0,3310.0,18757.0,3780.0,17325.0,17920.0,20191.0,830.0,1736.0,170.0,303.0,86180.0,1662813.0,7527384.0,73820.0,256947.0,10260.0,18251.0,77040.0,80626.0,86010.0,382943.0,71440.0,505429.0,77170.0,399008.0,163020.0,9939247.0,162740.0,1438828.0,290.0,307.0,77550.0,131562.0,7760.0,804.0,15740.0,8662.0,18370.0,23848.0,0.0,0.0,55010.0,90930.0,7420.0,2594.0,18720.0,51158.0,161800.0,1658126.0,0.0,0.0,0.0,0.0,720.0,836.0,13430.0,12398.0,161590.0,1306838.0,162050.0,1374682.0,0.0,0.0,0.0,0.0,41950.0,113997.0,116830.0,385953.0
1,AL,0,5,192050.0,16930.0,168170.0,5450.0,115290.0,538120.0,177800.0,25777351.0,192050.0,26053920.0,172590.0,19141939.0,110480.0,149150.0,66710.0,312926.0,61630.0,231997.0,106370.0,103407.0,36380.0,567439.0,56050.0,404166.0,29250.0,738685.0,65310.0,2456580.0,8240.0,5360.0,22840.0,41900.0,921766.0,23860.0,666191.0,67940.0,263329.0,12390.0,3262.0,1230.0,17607.0,7140.0,50624.0,5400.0,29276.0,19320.0,19218.0,4750.0,10113.0,710.0,2710.0,141460.0,3323093.0,19436460.0,128960.0,697624.0,10410.0,23707.0,132970.0,193679.0,141390.0,969162.0,121320.0,1019585.0,131150.0,869141.0,191870.0,19721991.0,191690.0,3440302.0,2270.0,4948.0,89410.0,123786.0,17830.0,4900.0,19680.0,10767.0,21860.0,30646.0,0.0,0.0,44870.0,61114.0,9410.0,2972.0,27160.0,110673.0,190480.0,3584131.0,0.0,0.0,0.0,0.0,80.0,100.0,20040.0,19089.0,191280.0,3312046.0,191470.0,3460632.0,650.0,146.0,240.0,63.0,73380.0,358812.0,110760.0,441951.0
1,AL,0,6,46890.0,3530.0,42190.0,860.0,36250.0,137410.0,48270.0,20346741.0,46890.0,20752068.0,41230.0,10178838.0,37970.0,271416.0,29970.0,663873.0,28610.0,523865.0,24150.0,85076.0,12670.0,822565.0,28660.0,1569967.0,7930.0,366219.0,13000.0,714100.0,2320.0,530.0,2434.0,9680.0,245327.0,21220.0,4533514.0,20970.0,392075.0,1000.0,251.0,2730.0,85219.0,7980.0,82472.0,1090.0,10402.0,0.0,0.0,0.0,0.0,1600.0,54845.0,44170.0,2375957.0,19397505.0,42340.0,767152.0,1390.0,4611.0,42210.0,158794.0,44160.0,956260.0,34080.0,452100.0,42110.0,831896.0,46860.0,17547726.0,46850.0,4781679.0,14810.0,73078.0,19880.0,88589.0,13400.0,49046.0,2920.0,1601.0,0.0,0.0,0.0,0.0,0.0,0.0,1470.0,486.0,13920.0,120102.0,46350.0,4697703.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,46820.0,4674681.0,46840.0,4918876.0,17990.0,31038.0,22940.0,72351.0,25160.0,707320.0,14840.0,252825.0
1,AL,35004,1,1530.0,950.0,260.0,300.0,800.0,2250.0,710.0,19524.0,1530.0,19851.0,1180.0,14383.0,230.0,183.0,90.0,128.0,80.0,82.0,40.0,12.0,220.0,1657.0,60.0,4.0,110.0,806.0,240.0,2129.0,0.0,50.0,238.0,100.0,170.0,20.0,38.0,260.0,327.0,0.0,0.0,0.0,0.0,30.0,85.0,0.0,0.0,40.0,31.0,30.0,62.0,0.0,0.0,140.0,1848.0,2063.0,50.0,49.0,80.0,58.0,80.0,43.0,140.0,167.0,80.0,503.0,100.0,194.0,670.0,3894.0,660.0,411.0,0.0,0.0,190.0,58.0,0.0,0.0,0.0,0.0,50.0,27.0,60.0,10.0,50.0,14.0,0.0,0.0,200.0,292.0,1390.0,3275.0,560.0,1479.0,510.0,1287.0,350.0,409.0,100.0,87.0,530.0,353.0,720.0,662.0,0.0,0.0,0.0,0.0,120.0,119.0,1310.0,2734.0
1,AL,35004,2,1330.0,590.0,410.0,270.0,680.0,2600.0,860.0,48895.0,1330.0,49338.0,1190.0,41998.0,240.0,172.0,100.0,155.0,80.0,102.0,230.0,106.0,160.0,788.0,70.0,54.0,90.0,839.0,230.0,3602.0,0.0,60.0,226.0,200.0,1331.0,20.0,6.0,280.0,442.0,40.0,9.0,0.0,0.0,40.0,189.0,30.0,82.0,130.0,126.0,0.0,0.0,0.0,0.0,400.0,5354.0,15571.0,330.0,642.0,60.0,67.0,300.0,165.0,400.0,931.0,310.0,1770.0,310.0,812.0,1280.0,25292.0,1280.0,3109.0,0.0,0.0,560.0,536.0,0.0,0.0,110.0,58.0,120.0,132.0,170.0,27.0,330.0,308.0,50.0,10.0,100.0,197.0,1310.0,5652.0,320.0,569.0,260.0,487.0,240.0,342.0,80.0,74.0,1050.0,2574.0,1100.0,2819.0,0.0,0.0,0.0,0.0,170.0,188.0,1140.0,2998.0
1,AL,35004,3,910.0,290.0,490.0,110.0,450.0,2020.0,620.0,55761.0,910.0,56170.0,840.0,47285.0,250.0,185.0,120.0,380.0,100.0,283.0,310.0,178.0,120.0,584.0,80.0,139.0,70.0,820.0,190.0,3420.0,0.0,40.0,152.0,150.0,2058.0,30.0,492.0,260.0,409.0,40.0,10.0,0.0,0.0,0.0,0.0,0.0,0.0,140.0,138.0,0.0,0.0,0.0,0.0,430.0,6464.0,26501.0,390.0,1022.0,40.0,60.0,370.0,231.0,430.0,1396.0,360.0,2265.0,350.0,1137.0,910.0,36244.0,900.0,5051.0,0.0,0.0,410.0,629.0,40.0,2.0,90.0,47.0,80.0,98.0,60.0,8.0,300.0,448.0,30.0,9.0,70.0,124.0,890.0,6261.0,0.0,0.0,0.0,0.0,40.0,35.0,60.0,52.0,870.0,4422.0,880.0,4583.0,0.0,0.0,0.0,0.0,180.0,327.0,720.0,1974.0
1,AL,35004,4,610.0,90.0,490.0,40.0,300.0,1630.0,530.0,52579.0,610.0,52977.0,580.0,46211.0,220.0,89.0,110.0,119.0,90.0,70.0,290.0,181.0,60.0,339.0,70.0,173.0,60.0,757.0,150.0,2905.0,0.0,40.0,151.0,90.0,1765.0,20.0,171.0,190.0,397.0,40.0,10.0,0.0,0.0,0.0,0.0,30.0,114.0,120.0,132.0,30.0,55.0,0.0,0.0,370.0,6600.0,32150.0,340.0,1242.0,40.0,86.0,340.0,233.0,370.0,1607.0,330.0,2280.0,320.0,1365.0,610.0,36815.0,610.0,5344.0,0.0,0.0,310.0,566.0,0.0,0.0,110.0,66.0,70.0,82.0,0.0,0.0,250.0,398.0,30.0,6.0,40.0,98.0,600.0,6210.0,0.0,0.0,0.0,0.0,0.0,0.0,50.0,43.0,600.0,4777.0,610.0,4920.0,0.0,0.0,0.0,0.0,110.0,262.0,500.0,1561.0


In [0]:
display(markets)

FMID,MarketName,Website,Facebook,Twitter,Youtube,OtherMedia,street,city,County,State,zip,Season1Date,Season1Time,Season2Date,Season2Time,Season3Date,Season3Time,Season4Date,Season4Time,x,y,Location,Credit,WIC,WICcash,SFMNP,SNAP,Organic,Bakedgoods,Cheese,Crafts,Flowers,Eggs,Seafood,Herbs,Vegetables,Honey,Jams,Maple,Meat,Nursery,Nuts,Plants,Poultry,Prepared,Soap,Trees,Wine,Coffee,Beans,Fruits,Grains,Juices,Mushrooms,PetFood,Tofu,WildHarvested,updateTime
1000618,100-Mile Market,http://www.peoplesfoodco-op.org/,,,,,507 Harrison Street,Kalamazoo,Kalamazoo,Michigan,49007,05/07/2014 to 10/29/2014,Wed: 3:00 PM-7:00 PM;,,,,,,,-85.57502,42.29596,Private business parking lot,Y,Y,N,Y,Y,Y,Y,Y,N,Y,Y,N,Y,Y,Y,Y,Y,Y,N,N,Y,Y,Y,Y,N,N,Y,N,Y,N,N,Y,N,N,Y,4/24/2014 6:22:51 PM
1009364,106 S. Main Street Farmers Market,http://thetownofsixmile.wordpress.com/,,,,,106 S. Main Street,Six Mile,,South Carolina,29682,,,,,,,,,-82.8187,34.8042,,Y,N,N,N,N,-,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,2013
1010691,10th Steet Community Farmers Market,,,,,http://agrimissouri.com/mo-grown/grodetail.php?type=mo-grown&ID=275,10th Street and Poplar,Lamar,Barton,Missouri,64759,04/02/2014 to 11/30/2014,Wed: 3:00 PM-6:00 PM;Sat: 8:00 AM-1:00 PM;,,,,,,,-94.2746191,37.495628,,Y,N,N,N,N,-,Y,N,Y,N,Y,N,Y,Y,Y,Y,N,Y,N,N,Y,Y,Y,Y,N,N,N,N,Y,N,N,N,N,N,N,10/28/2014 9:49:46 AM
1002454,112st Madison Avenue,,,,,,112th Madison Avenue,New York,New York,New York,10029,July to November,Tue:8:00 am - 5:00 pm;Sat:8:00 am - 8:00 pm;,,,,,,,-73.9493,40.7939,Private business parking lot,N,N,Y,Y,N,-,Y,N,Y,Y,N,N,Y,Y,Y,Y,N,N,N,Y,N,N,Y,Y,N,N,N,N,N,N,N,N,N,N,N,3/1/2012 10:38:22 AM
1011100,12 South Farmers Market,http://www.12southfarmersmarket.com,12_South_Farmers_Market,@12southfrmsmkt,,@12southfrmsmkt,3000 Granny White Pike,Nashville,Davidson,Tennessee,37204,05/05/2015 to 10/27/2015,Tue: 3:30 PM-6:30 PM;,,,,,,,-86.790709,36.11837,,Y,N,N,N,Y,Y,Y,Y,N,Y,Y,N,Y,Y,Y,Y,Y,Y,N,N,N,Y,Y,Y,N,N,Y,N,Y,N,Y,Y,Y,N,N,5/1/2015 10:40:56 AM
1009845,125th Street Fresh Connect Farmers' Market,http://www.125thStreetFarmersMarket.com,https://www.facebook.com/125thStreetFarmersMarket,https://twitter.com/FarmMarket125th,,Instagram--> 125thStreetFarmersMarket,"163 West 125th Street and Adam Clayton Powell, Jr. Blvd.",New York,New York,New York,10027,06/10/2014 to 11/25/2014,Tue: 10:00 AM-7:00 PM;,,,,,,,-73.9482477,40.8089533,Federal/State government building grounds,Y,Y,N,Y,Y,Y,Y,Y,Y,Y,Y,N,Y,Y,Y,Y,Y,Y,N,Y,N,Y,Y,Y,N,Y,Y,N,Y,N,Y,N,N,N,N,4/7/2014 4:32:01 PM
1005586,12th & Brandywine Urban Farm Market,,https://www.facebook.com/pages/12th-Brandywine-Urban-Farm-Community-Garden/253769448091860,,,https://www.facebook.com/delawareurbanfarmcoalition,12th & Brandywine Streets,Wilmington,New Castle,Delaware,19801,05/16/2014 to 10/17/2014,Fri: 8:00 AM-11:00 AM;,,,,,,,-75.53446,39.742117,"On a farm from: a barn, a greenhouse, a tent, a stand, etc",N,N,N,N,Y,N,N,N,N,N,N,N,Y,Y,N,N,N,N,N,N,N,N,N,N,N,N,N,N,Y,N,N,N,N,N,N,4/3/2014 3:43:31 PM
1008071,14&U Farmers' Market,,https://www.facebook.com/14UFarmersMarket,https://twitter.com/14UFarmersMkt,,,1400 U Street NW,Washington,District of Columbia,District of Columbia,20009,05/03/2014 to 11/22/2014,Sat: 9:00 AM-1:00 PM;,,,,,,,-77.0320505,38.9169984,Other,Y,Y,Y,Y,Y,Y,Y,Y,N,Y,Y,N,Y,Y,Y,Y,N,Y,N,Y,Y,Y,N,N,N,N,N,Y,Y,Y,Y,N,N,N,N,4/5/2014 1:49:04 PM
1010284,175th Street Greenmarket,http://www.grownyc.org/ourmarkets,,,,,175th Street at Wadsworth Ave,New York,New York,New York,10033,06/26/2014 to 11/20/2014,Thu: 8:00 AM-5:00 PM;,,,,,,,-73.9378024,40.8459583,Closed-off public street,Y,Y,Y,Y,Y,-,Y,N,N,N,N,Y,N,Y,N,N,N,N,N,N,Y,N,N,N,N,N,N,N,Y,N,N,N,N,N,N,6/3/2014 12:38:54 PM
1003877,17th Ave Market,http://www.iatp.org/minimarkets,,,,,1622 6th St NE,Minneapolis,Hennepin,Minnesota,55413,June to September,Wed:3:00 pm - 7:00 pm;,,,,,,,-93.2591,45.0044,"Faith-based institution (e.g., church, mosque, synagogue, temple)",N,Y,Y,Y,N,-,N,N,N,N,N,N,Y,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,N,4/28/2011 2:33:54 PM


Next cell does:

- variable type conversions
- renaming columns 
- shorten zip code to 4 digits instead of 5
- add SCHF (number of farm returns) and AGI_STUB - size of adjusted gross income

In [0]:
%sql 
DROP TABLE IF EXISTS cleaned_taxes;

CREATE TABLE cleaned_taxes AS
SELECT state, int(zipcode / 10) as zipcode, 
  int(mars1) as single_returns, 
  int(mars2) as joint_returns, 
  int(numdep) as numdep,
  int(SCHF) as farm_returns,
  int(AGI_STUB) as agi_stub,
  double(A02650) as total_income_amount,
  double(A00300) as taxable_interest_amount,
  double(a01000) as net_capital_gains,
  double(a00900) as biz_net_income
FROM taxes2013


- display cleaned_taxes:

In [0]:
%sql 
select * FROM cleaned_taxes WHERE zipcode in (0000, 9999)

state,zipcode,single_returns,joint_returns,numdep,farm_returns,agi_stub,total_income_amount,taxable_interest_amount,net_capital_gains,biz_net_income
AL,0,488030,122290,571240,8800,1,11444868.0,77952.0,23583.0,824487.0
AL,0,195840,155230,383240,8890,2,17810952.0,81216.0,54639.0,252768.0
AL,0,72710,146880,189340,7940,3,16070153.0,80627.0,84137.0,259836.0
AL,0,24860,126480,134370,6240,4,14288572.0,71086.0,105947.0,214668.0
AL,0,16930,168170,177800,8240,5,26053920.0,149150.0,404166.0,567439.0
AL,0,3530,42190,48270,2320,6,20752068.0,271416.0,1569967.0,822565.0
AL,9999,8290,1890,8690,120,1,194857.0,1390.0,103.0,14037.0
AL,9999,4260,2260,7000,130,2,345889.0,1562.0,701.0,7650.0
AL,9999,1880,1880,2670,100,3,291667.0,1476.0,989.0,6294.0
AL,9999,700,1600,1490,100,4,222540.0,1263.0,1373.0,2886.0


- shorten zip code to 4 digits instead of 5
- count number of markets per zip code per state

In [0]:
%sql 
DROP TABLE IF EXISTS cleaned_markets;
 
CREATE TABLE cleaned_markets AS
SELECT int(zip / 10) as zip, 
    double(count(zip)) as count
FROM markets
WHERE NOT (zip = 00000 OR zip = 99999)
GROUP BY zip


- display cleaned_markets:

In [0]:
%sql
select * from cleaned_markets

zip,count
2867,1.0
2921,2.0
4930,2.0
3731,1.0
1740,2.0
1252,1.0
2833,1.0
9002,2.0
4963,1.0
7730,1.0


create a table of all the summed taxes:

In [0]:
%sql
DROP TABLE IF EXISTS summed_taxes;
CREATE TABLE summed_taxes AS
SELECT  
  zipcode, 
  sum(net_capital_gains) + sum(biz_net_income) as combo,
  sum(farm_returns) as num_farm_returns,
  sum(agi_stub) as sum_agi_stub
FROM cleaned_taxes
GROUP BY zipcode

In [0]:
%sql
select * from summed_taxes

zipcode,combo,num_farm_returns,sum_agi_stub
8592,12082.0,30,105
7240,114112.0,370,42
7253,4927.0,400,189
8086,21256.0,110,63
9564,131769.0,400,105
8105,26483.0,670,126
623,71241.0,90,147
9454,848794.0,80,147
9558,5532.0,0,42
9071,143677.0,0,126


In [0]:
# convert sql tables to dataframes
cleanedTaxes = spark.sql("select * from cleaned_taxes")
cleanedMarkets = spark.sql("select * from cleaned_markets")
summedTaxes = spark.sql("select * from summed_taxes")

In [0]:
display(cleanedTaxes.head(5))

state,zipcode,single_returns,joint_returns,numdep,farm_returns,agi_stub,total_income_amount,taxable_interest_amount,net_capital_gains,biz_net_income
AL,0,488030,122290,571240,8800,1,11444868.0,77952.0,23583.0,824487.0
AL,0,195840,155230,383240,8890,2,17810952.0,81216.0,54639.0,252768.0
AL,0,72710,146880,189340,7940,3,16070153.0,80627.0,84137.0,259836.0
AL,0,24860,126480,134370,6240,4,14288572.0,71086.0,105947.0,214668.0
AL,0,16930,168170,177800,8240,5,26053920.0,149150.0,404166.0,567439.0


In [0]:
display(cleanedMarkets.head(5))

zip,count
2867,1.0
2921,2.0
4930,2.0
3731,1.0
1740,2.0


In [0]:
display(summedTaxes.head(5))

zipcode,combo,num_farm_returns,sum_agi_stub
8592,12082.0,30,105
7240,114112.0,370,42
7253,4927.0,400,189
8086,21256.0,110,63
9564,131769.0,400,105


In [0]:
# join the 3 tables
jointTable = cleanedMarkets.join(summedTaxes, cleanedMarkets.zip == summedTaxes.zipcode, "outer")

In [0]:
display(jointTable)

zip,count,zipcode,combo,num_farm_returns,sum_agi_stub
,,0.0,709592472.0,1666160.0,1071.0
3.0,1.0,,,,
60.0,1.0,,,,
60.0,1.0,,,,
60.0,2.0,,,,
61.0,1.0,,,,
62.0,1.0,,,,
62.0,1.0,,,,
63.0,1.0,,,,
65.0,1.0,,,,


In [0]:
# fill NA with zeros
finalTable = jointTable.na.fill(0)

In [0]:
display(finalTable)

zip,count,zipcode,combo,num_farm_returns,sum_agi_stub
0,0.0,0,709592472.0,1666160,1071
3,1.0,0,0.0,0,0
60,1.0,0,0.0,0,0
60,1.0,0,0.0,0,0
60,2.0,0,0.0,0,0
61,1.0,0,0.0,0,0
62,1.0,0,0.0,0,0
62,1.0,0,0.0,0,0
63,1.0,0,0.0,0,0
65,1.0,0,0.0,0,0


In [0]:
# MLlib prep, combine all into one column of same type
nonFeatureCols = ["zip", "zipcode", "count"]
featureCols = [item for item in finalTable.columns if item not in nonFeatureCols]

In [0]:
from pyspark.ml.feature import VectorAssembler

assembler = (VectorAssembler()
  .setInputCols(featureCols)
  .setOutputCol("features"))

finalPrep = assembler.transform(finalTable)

In [0]:
# split into 70-30
training, test = finalPrep.randomSplit([0.7, 0.3])

#  cache data
training.cache()
test.cache()

print(training.count())
print(test.count())

In [0]:
from pyspark.ml.regression import LinearRegression
 
lrModel = (LinearRegression()
           .setLabelCol("count")
          .setFeaturesCol("features")
          .setElasticNetParam(0.5))
print("Printing out the model Parameters:")
print("-"*20)
print(lrModel.explainParams())
print("-"*20)

In [0]:
from pyspark.mllib.evaluation import RegressionMetrics
lrFitted = lrModel.fit(training)

In [0]:
holdout = (lrFitted
           .transform(test)
           .selectExpr("prediction as raw_prediction", 
                       "double(round(prediction)) as prediction", 
                       "count", 
                       """CASE double(round(prediction)) = count 
                       WHEN true then 1
                       ELSE 0
                       END as equal"""))

In [0]:
display(holdout)

raw_prediction,prediction,count,equal
1.0158475966958656,1.0,0.0,0
0.6956305876760952,1.0,0.0,0
1.1043171248861512,1.0,0.0,0
0.8792232059437297,1.0,1.0,1
1.2263540089145093,1.0,1.0,1
1.2263540089145093,1.0,1.0,1
1.0237805352971625,1.0,1.0,1
1.1556757729602023,1.0,1.0,1
1.1556757729602023,1.0,1.0,1
0.9867268207714944,1.0,1.0,1


In [0]:
# compare with example
# example was 0.20899940793368857
display(holdout.selectExpr("sum(equal)/sum(1)"))

(sum(equal) / sum(1))
0.5934730056406124


In [0]:
# type conversion for regression metrics
rm = RegressionMetrics(holdout.select("prediction", "count").rdd.map(lambda x:  (x[0], x[1])))

print("MSE: ", rm.meanSquaredError)
print("MAE: ", rm.meanAbsoluteError)
print("RMSE Squared: ", rm.rootMeanSquaredError)
print("R Squared: ", rm.r2)
print("Explained Variance: ", rm.explainedVariance, "\n")

In [0]:
# attempt to improve results using a pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml import Pipeline

rfModel = (RandomForestRegressor()
           .setLabelCol("count")
           .setFeaturesCol("features"))

paramGrid = (ParamGridBuilder()
             .addGrid(rfModel.maxDepth, [5, 10])
             .addGrid(rfModel.numTrees, [20, 60])
             .build())

stages = [rfModel]

pipeline = Pipeline().setStages(stages)

cv = (CrossValidator()
      .setEstimator(pipeline)
      .setEstimatorParamMaps(paramGrid)
      .setEvaluator(RegressionEvaluator().setLabelCol("count")))

pipelineFitted = cv.fit(training)

In [0]:
print("The Best Parameters:\n-------------------------")
print(pipelineFitted.bestModel.stages[0])

In [0]:
pipelineFitted.bestModel.stages[0].extractParamMap()

In [0]:
pipelineFitted.bestModel

In [0]:
holdout2 = (pipelineFitted.bestModel
            .transform(test)
            .selectExpr("prediction as raw_prediction",
                        "double(round(prediction)) as prediction", 
                        "count", 
                        """CASE double(round(prediction)) = count 
                        WHEN true then 1
                        ELSE 0
                        END as equal"""))

In [0]:
display(holdout2)

raw_prediction,prediction,count,equal
1.0906271706820092,1.0,0.0,0
0.2753324506172306,0.0,0.0,1
0.5516446396477602,1.0,0.0,0
0.7368641556285185,1.0,1.0,1
1.2696283484186424,1.0,1.0,1
1.2696283484186424,1.0,1.0,1
1.132038748837931,1.0,1.0,1
1.1641764490463624,1.0,1.0,1
1.1641764490463624,1.0,1.0,1
1.1498128123835991,1.0,1.0,1


In [0]:
rm2 = RegressionMetrics(holdout2.select("prediction","count").rdd.map(lambda x: (x[0], x[1])))

print("MSE: ", rm2.meanSquaredError)
print("MAE: ", rm2.meanAbsoluteError)
print("RMSE Squared: ", rm2.rootMeanSquaredError)
print("R Squared: ", rm2.r2)
print("Explained Variance: ", rm2.explainedVariance, "\n")

In [0]:
# compare with old answer 0.40058823529411763
display(holdout2.selectExpr("sum(equal)/sum(1)"))

(sum(equal) / sum(1))
0.6482675261885577


# Part 2
 - create a model to predict the price of a diamond, using Apache Spark ML Pipeline

In [0]:
# read data
dataPath = "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv"
diamonds = (sqlContext.read.format("com.databricks.spark.csv")
            .option("header","true")
            .option("inferSchema", "true")
            .load(dataPath))

In [0]:
display(diamonds)

_c0,carat,cut,color,clarity,depth,table,price,x,y,z
1,0.23,Ideal,E,SI2,61.5,55.0,326,3.95,3.98,2.43
2,0.21,Premium,E,SI1,59.8,61.0,326,3.89,3.84,2.31
3,0.23,Good,E,VS1,56.9,65.0,327,4.05,4.07,2.31
4,0.29,Premium,I,VS2,62.4,58.0,334,4.2,4.23,2.63
5,0.31,Good,J,SI2,63.3,58.0,335,4.34,4.35,2.75
6,0.24,Very Good,J,VVS2,62.8,57.0,336,3.94,3.96,2.48
7,0.24,Very Good,I,VVS1,62.3,57.0,336,3.95,3.98,2.47
8,0.26,Very Good,H,SI1,61.9,55.0,337,4.07,4.11,2.53
9,0.22,Fair,E,VS2,65.1,61.0,337,3.87,3.78,2.49
10,0.23,Very Good,H,VS1,59.4,61.0,338,4.0,4.05,2.39


In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
 
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(diamonds) for column in ["cut", "color", "clarity"] ]
 
 
pipeline = Pipeline(stages=indexers)

numDiamonds = pipeline.fit(diamonds).transform(diamonds)

In [0]:
display(numDiamonds)

_c0,carat,cut,color,clarity,depth,table,price,x,y,z,cut_index,color_index,clarity_index
1,0.23,Ideal,E,SI2,61.5,55.0,326,3.95,3.98,2.43,0.0,1.0,2.0
2,0.21,Premium,E,SI1,59.8,61.0,326,3.89,3.84,2.31,1.0,1.0,0.0
3,0.23,Good,E,VS1,56.9,65.0,327,4.05,4.07,2.31,3.0,1.0,3.0
4,0.29,Premium,I,VS2,62.4,58.0,334,4.2,4.23,2.63,1.0,5.0,1.0
5,0.31,Good,J,SI2,63.3,58.0,335,4.34,4.35,2.75,3.0,6.0,2.0
6,0.24,Very Good,J,VVS2,62.8,57.0,336,3.94,3.96,2.48,2.0,6.0,4.0
7,0.24,Very Good,I,VVS1,62.3,57.0,336,3.95,3.98,2.47,2.0,5.0,5.0
8,0.26,Very Good,H,SI1,61.9,55.0,337,4.07,4.11,2.53,2.0,3.0,0.0
9,0.22,Fair,E,VS2,65.1,61.0,337,3.87,3.78,2.49,4.0,1.0,1.0
10,0.23,Very Good,H,VS1,59.4,61.0,338,4.0,4.05,2.39,2.0,3.0,3.0


In [0]:
nonFeatureCols = ["_c0","price","cut","color","clarity","x","y","z"]
featureCols = [item for item in numDiamonds.columns if item not in nonFeatureCols]

print(featureCols)

In [0]:
from pyspark.ml.feature import VectorAssembler

assembler = (VectorAssembler()
             .setInputCols(featureCols)
             .setOutputCol("features"))
 
finalDiamonds = assembler.transform(numDiamonds)

In [0]:
training, test = finalDiamonds.randomSplit([0.7, 0.3])

training.cache()
test.cache()

print(training.count())  
print(test.count())

In [0]:
from pyspark.ml.regression import LinearRegression
 
lrModel = (LinearRegression()
           .setLabelCol("price")
           .setFeaturesCol("features")
           .setElasticNetParam(0.5))

In [0]:
print("Printing out the model Parameters:")
print("-"*20)
print(lrModel.explainParams())
print("-"*20)

In [0]:
from pyspark.mllib.evaluation import RegressionMetrics

lrFitted = lrModel.fit(training)

# considering 10% error and taking abs of negative estimates
holdout = (lrFitted
           .transform(test)
           .selectExpr("prediction as raw_prediction", 
                       "double(round(prediction)) as prediction",
                       "price", 
                       "abs(double(round(prediction))) - price as difference",
                       "(abs(double(round(prediction))) - price)/price as precision",           
                       """CASE (abs(double(round(prediction))) - price)/price < 0.1 and (abs(double(round(prediction))) - price)/price > -0.1
                       WHEN true then 1
                       ELSE 0
                       END as equal"""))

In [0]:
display(holdout)

raw_prediction,prediction,price,difference,precision,equal
-1153.026914145442,-1153.0,335,818.0,2.4417910447761195,0
-910.2277711223704,-910.0,336,574.0,1.7083333333333333,0
-768.8508367155373,-769.0,338,431.0,1.2751479289940828,0
-1033.6429014017897,-1034.0,340,694.0,2.041176470588235,0
-903.7130937116308,-904.0,342,562.0,1.6432748538011697,0
913.5431565657728,914.0,345,569.0,1.6492753623188403,0
-1264.16785504775,-1264.0,351,913.0,2.601139601139601,0
-1394.846850687527,-1395.0,351,1044.0,2.9743589743589745,0
-73.96425982689289,-74.0,354,-280.0,-0.7909604519774012,0
-966.4108781944152,-966.0,355,611.0,1.7211267605633802,0


In [0]:
display(holdout.selectExpr("sum(equal)/sum(1)"))

(sum(equal) / sum(1))
0.2136929460580912


In [0]:
rfModel = (RandomForestRegressor()
          .setLabelCol("price")
          .setFeaturesCol("features"))
 
paramGrid = (ParamGridBuilder()
            .addGrid(rfModel.maxDepth, [5, 8])
            .addGrid(rfModel.numTrees, [5, 10])
            .build())
 
stages = [rfModel]
 
pipeline = Pipeline().setStages(stages)
 
cv = (CrossValidator()
     .setEstimator(pipeline)
     .setEstimatorParamMaps(paramGrid)
     .setEvaluator(RegressionEvaluator().setLabelCol("price")))
 
pipelineFitted = cv.fit(training)

In [0]:
print("The Best Parameters:\n-------------------------")
print(pipelineFitted.bestModel.stages[0])

In [0]:
pipelineFitted.bestModel.stages[0].extractParamMap()

In [0]:
holdout2 = (pipelineFitted.bestModel
            .transform(test)
            .selectExpr("prediction as raw_prediction", 
                        "double(round(prediction)) as prediction",
                        "price", 
                        "abs(double(round(prediction))) - price as difference",
                        "(abs(double(round(prediction))) - price)/price as precision",           
                        """CASE (abs(double(round(prediction))) - price)/price < 0.1 and (abs(double(round(prediction))) - price)/price > -0.1
                        WHEN true then 1
                        ELSE 0
                        END as equal"""))

In [0]:
display(holdout2)

raw_prediction,prediction,price,difference,precision,equal
1641.0349981078266,1641.0,335,1306.0,3.8985074626865672,0
738.5851601412799,739.0,336,403.0,1.199404761904762,0
1334.1559403082765,1334.0,338,996.0,2.946745562130177,0
1145.683310815039,1146.0,340,806.0,2.3705882352941177,0
1773.124236992728,1773.0,342,1431.0,4.184210526315789,0
1131.2067827249307,1131.0,345,786.0,2.2782608695652176,0
679.5799762166376,680.0,351,329.0,0.9373219373219374,0
821.2223229918725,821.0,351,470.0,1.339031339031339,0
830.95840543802,831.0,354,477.0,1.347457627118644,0
784.7478686933999,785.0,355,430.0,1.2112676056338028,0


In [0]:
display(holdout2.selectExpr("sum(equal)/sum(1)"))

(sum(equal) / sum(1))
0.3500565824217276
