In [1]:
!gdown -q --fuzzy 1_1ecJ3ZD-c7mgzpE0JNpWBpaT3xqzuEM -O weekly-patterns-nyc-2019-2020-sample.csv

In [2]:
%%shell
pip --quiet install pyspark
pip --quiet install pyproj

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.7/7.7 MB[0m [31m100.5 MB/s[0m eta [36m0:00:00[0m
[?25h



In [3]:
import pandas as pd
import json
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
sc = pyspark.SparkContext.getOrCreate()
spark = SparkSession(sc)
spark

In [4]:
weekly_pattern_csv = 'weekly-patterns-nyc-2019-2020-sample.csv'
weekly_pattern0 = sc.textFile(weekly_pattern_csv, use_unicode=True).cache()

In [5]:
columns = weekly_pattern0.take(1)
len(columns[0].split(','))

28

In [10]:
#In this challenge, we are only interested in the safegraph_placekey column 
# which matches the placekey column in the Weekly Pattern dataset.
nyc_supermarkets = pd.read_csv('nyc_supermarkets.csv')
nyc_supermarkets = list(nyc_supermarkets['safegraph_placekey'])

In [11]:
nyc_supermarkets[:3]

['228-222@627-s8r-66k', '22j-222@627-s8j-bff', 'zzy-222@627-s8j-99f']

In [12]:
# This CSV file contains the centroid location for each census block group (CBG).
# This information will be useful in computing the haversine distance between two CBGs. 
# cbg_fips refers to the FIPS code of the CBGs used in Safegraph
nyc_cbg_centroids = pd.read_csv('nyc_cbg_centroids.csv')
nyc_cbg_centroids.index = [str(x) for x in nyc_cbg_centroids['cbg_fips']]
cbg_fips = nyc_cbg_centroids.drop(columns = ['cbg_fips'])
cbg = list(cbg_fips.index)

In [13]:
cbg_fips[:3]

Unnamed: 0,latitude,longitude
360050001000,40.798418,-73.888787
360050001001,40.791133,-73.882652
360050002001,40.813969,-73.860872


In [14]:
columns_interested = ['"placekey"','"poi_cbg"','"visitor_home_cbgs"', '"date_range_start"','"date_range_end"']
index = []
for x in columns_interested:
  index.append(columns[0].split(',').index(x))

In [15]:
def skipFirst(part_id,rows):
  if part_id == 0:
    next(rows)
  for row in rows:
    yield row

In [16]:
weekly_pattern = weekly_pattern0.mapPartitionsWithIndex(skipFirst)

In [17]:
import re
def modify(row):
    pattern = '[\[{](.*?)[\]}]'
    matches = re.finditer(pattern,row)
    positions = []
    for match in matches:
        positions.append((match.start(), match.end()))
    quot_modified = []
    if len(positions)>0:
        for x in positions:
            target_quot = row[x[0]:x[1]]
            replacement_quot = re.sub(",","_", target_quot)
            quot_modified.append((replacement_quot,x))
        output_row = row
        for y in quot_modified:
            output_row = output_row[:y[1][0]]+y[0]+output_row[y[1][1]:]
        return output_row.lower().split(',')
    else:
        return row.lower().split(',')

In [18]:
def modify_visitor_home_cgbs(row):
  vhc = row[2][2:-2].split('_')
  vhc1 = [x.split(':') for x in vhc]
  out = []
  for x in vhc1:
    if len(x)==2:
      out.append((x[0][2:-2],x[1]))
  return row[0],row[1],out,row[3],row[4]

In [19]:
approach_step1 = weekly_pattern.map(modify)\
                                .map(lambda x:(x[index[0]],x[index[1]],x[index[2]],x[index[3]],x[index[4]]))\
                                .map(modify_visitor_home_cgbs)\
                                .filter(lambda x: x[0] in nyc_supermarkets)

approach_step1.count()

907

In [20]:
approach_step1.take(2)

[('23d-222@627-s9v-qzz',
  '360810096001',
  [('360810166001', '4'), ('360810096001', '4'), ('360810098002', '4')],
  '2018-12-31t00:00:00-05:00',
  '2019-01-07t00:00:00-05:00'),
 ('235-222@627-wc3-9zz',
  '360470045001',
  [('360470121001', '7'),
   ('360470114003', '6'),
   ('360470265003', '5'),
   ('360470047001', '4'),
   ('360470075004', '4'),
   ('360470377003', '4'),
   ('360470149002', '4'),
   ('360470343001', '4'),
   ('360810456001', '4'),
   ('360470063002', '4'),
   ('360470069002', '4'),
   ('360470740002', '4'),
   ('360470325002', '4'),
   ('360470007002', '4'),
   ('360470377002', '4'),
   ('360810269012', '4'),
   ('360470135003', '4'),
   ('360470067002', '4'),
   ('360471128002', '4'),
   ('360470558002', '4'),
   ('360470315002', '4'),
   ('360471502003', '4'),
   ('360470045001', '4'),
   ('360610188004', '4'),
   ('360470343003', '4'),
   ('360470143003', '4'),
   ('360811579015', '4'),
   ('360470678001', '4'),
   ('360470800002', '4'),
   ('360470007001', '4')

In [21]:
from datetime import datetime
Mar_2019_start_str = '2019-03-01'
Mar_2019_end_str = '2019-03-31'
Mar_2019_start = datetime.strptime(Mar_2019_start_str,'%Y-%m-%d')
Mar_2019_end = datetime.strptime(Mar_2019_end_str,'%Y-%m-%d')

Oct_2019_start_str = '2019-10-01'
Oct_2019_end_str = '2019-10-31'
Oct_2019_start = datetime.strptime(Oct_2019_start_str,'%Y-%m-%d')
Oct_2019_end = datetime.strptime(Oct_2019_end_str,'%Y-%m-%d')

Mar_2020_start_str = '2020-03-01'
Mar_2020_end_str = '2020-03-31'
Mar_2020_start = datetime.strptime(Mar_2020_start_str,'%Y-%m-%d')
Mar_2020_end = datetime.strptime(Mar_2020_end_str,'%Y-%m-%d')

Oct_2020_start_str = '2020-10-01'
Oct_2020_end_str = '2020-10-31'
Oct_2020_start = datetime.strptime(Oct_2020_start_str,'%Y-%m-%d')
Oct_2020_end = datetime.strptime(Oct_2020_end_str,'%Y-%m-%d')

In [22]:
def filter_date(row):
  start = row[3]
  end = row[4]
  if Mar_2019_start <= end and start <= Mar_2019_end:
    return True
  elif Oct_2019_start <= end and start <= Oct_2019_end:
    return True
  elif Mar_2020_start <= end and start <= Mar_2020_end:
    return True
  elif Oct_2020_start <= end and start <= Oct_2020_end:
    return True
  else:
    return False

In [23]:
approach_step2 = approach_step1.map(lambda x: (x[0],x[1],x[2],datetime.strptime(x[3][:10],'%Y-%m-%d'),datetime.strptime(x[4][:10],'%Y-%m-%d')))\
                                .filter(filter_date)
approach_step2.count()

201

In [24]:
# transform to EPSG2263
import pyproj
proj = pyproj.Proj(init='EPSG:2263', preserve_units=True)
cbg_fips_proj = cbg_fips.apply(lambda row: proj(row['longitude'],row['latitude']), axis=1)

  in_crs_string = _prepare_from_proj_string(in_crs_string)


In [25]:
import numpy as np
# used vistor home cbgs as the travel origin, and get distance between the store and visitors
def visitor_distance(rows):
  for x in rows:
    start = x[3]
    end = x[4]
    store = x[1]
    for visitors in x[2]:
      if visitors[0] in cbg:
        store_loc = np.array(cbg_fips_proj[store])
        visitor_loc = np.array(cbg_fips_proj[visitors[0]])
        distance = float(np.sqrt(sum((store_loc-visitor_loc)**2)))
        if Mar_2019_start <= end and start <= Mar_2019_end:
          yield visitors[0],(0,distance/5280,int(visitors[1]))
        elif Oct_2019_start <= end and start <= Oct_2019_end:
          yield visitors[0],(1,distance/5280,int(visitors[1]))
        elif Mar_2020_start <= end and start <= Mar_2020_end:
          yield visitors[0],(2,distance/5280,int(visitors[1]))
        elif Oct_2020_start <= end and start <= Oct_2020_end:
          yield visitors[0],(3,distance/5280,int(visitors[1]))

In [26]:
approach_step2.take(1)

[('235-222@627-wc3-9zz',
  '360470045001',
  [('360470121001', '7'),
   ('360470114003', '6'),
   ('360470265003', '5'),
   ('360470047001', '4'),
   ('360470075004', '4'),
   ('360470377003', '4'),
   ('360470149002', '4'),
   ('360470343001', '4'),
   ('360810456001', '4'),
   ('360470063002', '4'),
   ('360470069002', '4'),
   ('360470740002', '4'),
   ('360470325002', '4'),
   ('360470007002', '4'),
   ('360470377002', '4'),
   ('360810269012', '4'),
   ('360470135003', '4'),
   ('360470067002', '4'),
   ('360471128002', '4'),
   ('360470558002', '4'),
   ('360470315002', '4'),
   ('360471502003', '4'),
   ('360470045001', '4'),
   ('360610188004', '4'),
   ('360470343003', '4'),
   ('360470143003', '4'),
   ('360811579015', '4'),
   ('360470678001', '4'),
   ('360470800002', '4'),
   ('360470007001', '4'),
   ('360850040004', '4'),
   ('360470045002', '4'),
   ('360470357001', '4'),
   ('360470327001', '4'),
   ('360470504003', '4'),
   ('360470271001', '4'),
   ('360470059001', '

In [30]:
approach_step34 = approach_step2.mapPartitions(visitor_distance)
approach_step34.count()

2126

In [31]:
from functools import reduce

In [35]:
def average_formular(d,n):
  D = np.array(d)
  N = np.array(n).astype(int)
  return float(round(np.sum(D*N)/np.sum(N),2))

def average_distance(cbgs):
  mar_2019,oct_2019,mar_2020,oct_2020 = '','','',''
  mar_2019_d,oct_2019_d,mar_2020_d, oct_2020_d = [],[],[],[]
  mar_2019_n,oct_2019_n,mar_2020_n, oct_2020_n = [],[],[],[]
  for x in cbgs:
    if x[0]==0:
      mar_2019_d.append(x[1])
      mar_2019_n.append(x[2])
    elif x[0]==1:
      oct_2019_d.append(x[1])
      oct_2019_n.append(x[2])
    elif x[0]==2:
      mar_2020_d.append(x[1])
      mar_2020_n.append(x[2])
    elif x[0]==3:
      oct_2020_d.append(x[1])
      oct_2020_n.append(x[2])
  if len(mar_2019_d)>0:
    mar_2019 = average_formular(mar_2019_d,mar_2019_n)
  if len(oct_2019_n)>0:
    oct_2019 = average_formular(oct_2019_d,oct_2019_n)
  if len(mar_2020_n)>0:
    mar_2020 = average_formular(mar_2020_d, mar_2020_n)
  if len(oct_2020_n)>0:
    oct_2020 = average_formular(oct_2020_d,oct_2020_n)
  return mar_2019,oct_2019,mar_2020,oct_2020


approach_step5 = approach_step34.groupByKey()\
                                .mapValues(average_distance)\
                                .sortByKey()\
                                .map(lambda x: (x[0],x[1][0],x[1][1],x[1][2],x[1][3]))
approach_step5.count()

1674

In [39]:
output_df = approach_step5.toDF(['cbg_fips','2019-03','2019-10','2020-03','2020-10'])
output = output_df.rdd
#output.saveAsTextFile('BDM_FC_23499013_Jin.csv')

In [40]:
output_df.show()

+------------+-------+-------+-------+-------+
|    cbg_fips|2019-03|2019-10|2020-03|2020-10|
+------------+-------+-------+-------+-------+
|360050002003|       |   5.28|       |       |
|360050016001|    3.0|   null|       |       |
|360050019001|   0.49|   null|       |       |
|360050020002|    2.8|   null|       |       |
|360050027021|   0.19|   null|       |       |
|360050027022|       |   null|    2.0|       |
|360050027023|    0.0|   null|       |       |
|360050027024|   0.37|   null|       |       |
|360050033003|   0.17|   null|       |       |
|360050039001|   0.15|   null|       |       |
|360050041001|   0.24|   null|       |       |
|360050043004|       |   4.54|       |       |
|360050044002|       |   null|       |   4.81|
|360050048001|   0.19|   null|       |       |
|360050048002|    0.3|   null|       |       |
|360050050013|   0.36|   null|       |       |
|360050050021|    3.2|   null|       |       |
|360050050024|   0.19|   null|       |       |
|360050051003

In [None]:
!gcloud auth login

Go to the following link in your browser:

    https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=32555940559.apps.googleusercontent.com&redirect_uri=https%3A%2F%2Fsdk.cloud.google.com%2Fauthcode.html&scope=openid+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fuserinfo.email+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fappengine.admin+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fsqlservice.login+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcompute+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Faccounts.reauth&state=AYEBOj9To2N3fRe0eb6QjrcqzGhwwC&prompt=consent&access_type=offline&code_challenge=sC2c5BXJg_RBnELRM7L_SQ3G9jiDGPkSmeIWJ6oCLO0&code_challenge_method=S256

Enter authorization code: 4/0AbUR2VNTTGLtpTPeRdP7o6KB5qWCYA8JnJibmltg-yfcPoaPHPHWnbHpWsEoPavsZiUrfQ

You are now logged in as [zhiyuan.jin1201@gmail.com].
Your current project is [None].  You can change this setting by running:
  $ gcloud config set project PROJECT_ID

In [None]:
!gcloud projects list

PROJECT_ID            NAME              PROJECT_NUMBER
bda-12345             bda-12345         369021837527
directed-will-384217  My First Project  913257743120


In [None]:
!gcloud config set project bda-12345
!gcloud config set compute/region us-west1
!gcloud config set compute/zone us-west1-a
!gcloud config set dataproc/region us-west1

Updated property [core/project].
Updated property [compute/region].
Updated property [compute/zone].
Updated property [dataproc/region].


In [None]:
!gcloud dataproc clusters create bda-final-challenge --enable-component-gateway --region us-west1 --zone us-west1-a --master-machine-type n1-standard-4 --master-boot-disk-size 500 --num-workers 2 --worker-machine-type n1-standard-4 --worker-boot-disk-size 500 --image-version 2.0-debian10 --project bda-12345

Waiting on operation [projects/bda-12345/regions/us-west1/operations/dff8dec7-c136-39b3-8af7-4d9819efef4e].

Created [https://dataproc.googleapis.com/v1/projects/bda-12345/regions/us-west1/clusters/bda-final-challenge] Cluster placed in zone [us-west1-a].


In [None]:
%%writefile BDM_FC_23499013_Jin.py
import pandas as pd
import json
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
sc = pyspark.SparkContext.getOrCreate()
spark = SparkSession(sc)
spark

weekly_pattern_csv = 'gs://bdma/data/weekly-patterns-nyc-2019-2020/part-*'
weekly_pattern0 = sc.textFile(weekly_pattern_csv, use_unicode=True).cache()
columns = weekly_pattern0.take(1)

nyc_supermarkets = pd.read_csv('nyc_supermarkets.csv')
nyc_supermarkets = list(nyc_supermarkets['safegraph_placekey'])

nyc_cbg_centroids = pd.read_csv('nyc_cbg_centroids.csv')
nyc_cbg_centroids.index = [str(x) for x in nyc_cbg_centroids['cbg_fips']]
cbg_fips = nyc_cbg_centroids.drop(columns = ['cbg_fips'])
cbg = list(cbg_fips.index)

columns_interested = ['"placekey"','"poi_cbg"','"visitor_home_cbgs"', '"date_range_start"','"date_range_end"']
index = []
for x in columns_interested:
  index.append(columns[0].split(',').index(x))

def skipFirst(part_id,rows):
  if part_id == 0:
    next(rows)
  for row in rows:
    yield row

weekly_pattern = weekly_pattern0.mapPartitionsWithIndex(skipFirst)

import re
def modify(row):
    pattern = '[\[{](.*?)[\]}]'
    matches = re.finditer(pattern,row)
    positions = []
    for match in matches:
        positions.append((match.start(), match.end()))
    quot_modified = []
    if len(positions)>0:
        for x in positions:
            target_quot = row[x[0]:x[1]]
            replacement_quot = re.sub(",","_", target_quot)
            quot_modified.append((replacement_quot,x))
        output_row = row
        for y in quot_modified:
            output_row = output_row[:y[1][0]]+y[0]+output_row[y[1][1]:]
        return output_row.lower().split(',')
    else:
        return row.lower().split(',')

def modify_visitor_home_cgbs(row):
  vhc = row[2][2:-2].split('_')
  vhc1 = [x.split(':') for x in vhc]
  out = []
  for x in vhc1:
    if len(x)==2:
      out.append((x[0][2:-2],x[1]))
  return row[0],row[1],out,row[3],row[4]

approach_step1 = weekly_pattern.map(modify)\
                                .map(lambda x:(x[index[0]],x[index[1]],x[index[2]],x[index[3]],x[index[4]]))\
                                .filter(lambda x: len(x[2])>3)\
                                .map(modify_visitor_home_cgbs)\
                                .filter(lambda x: x[0] in nyc_supermarkets)

from datetime import datetime
Mar_2019_start_str = '2019-03-01'
Mar_2019_end_str = '2019-03-31'
Mar_2019_start = datetime.strptime(Mar_2019_start_str,'%Y-%m-%d')
Mar_2019_end = datetime.strptime(Mar_2019_end_str,'%Y-%m-%d')

Oct_2019_start_str = '2019-10-01'
Oct_2019_end_str = '2019-10-31'
Oct_2019_start = datetime.strptime(Oct_2019_start_str,'%Y-%m-%d')
Oct_2019_end = datetime.strptime(Oct_2019_end_str,'%Y-%m-%d')

Mar_2020_start_str = '2020-03-01'
Mar_2020_end_str = '2020-03-31'
Mar_2020_start = datetime.strptime(Mar_2020_start_str,'%Y-%m-%d')
Mar_2020_end = datetime.strptime(Mar_2020_end_str,'%Y-%m-%d')

Oct_2020_start_str = '2020-10-01'
Oct_2020_end_str = '2020-10-31'
Oct_2020_start = datetime.strptime(Oct_2020_start_str,'%Y-%m-%d')
Oct_2020_end = datetime.strptime(Oct_2020_end_str,'%Y-%m-%d')

def filter_date(row):
  start = row[3]
  end = row[4]
  if Mar_2019_start <= end and start <= Mar_2019_end:
    return True
  elif Oct_2019_start <= end and start <= Oct_2019_end:
    return True
  elif Mar_2020_start <= end and start <= Mar_2020_end:
    return True
  elif Oct_2020_start <= end and start <= Oct_2020_end:
    return True
  else:
    return False

approach_step2 = approach_step1.map(lambda x: (x[0],x[1],x[2],datetime.strptime(x[3][:10],'%Y-%m-%d'),datetime.strptime(x[4][:10],'%Y-%m-%d')))\
                                .filter(filter_date)
# transform to EPSG2263
import pyproj
proj = pyproj.Proj(init='EPSG:2263', preserve_units=True)
cbg_fips_proj = cbg_fips.apply(lambda row: proj(row['longitude'],row['latitude']), axis=1)

import numpy as np
# used vistor home cbgs as the travel origin, and get distance between the store and visitors
def visitor_distance(rows):
  for x in rows:
    start = x[3]
    end = x[4]
    for visitors in x[2]:
      if visitors[0] in cbg:
        store_loc = np.array(cbg_fips_proj[x[1]])
        visitor_loc = np.array(cbg_fips_proj[visitors[0]])
        distance = float(np.sqrt(sum((store_loc-visitor_loc)**2)))
        if Mar_2019_start <= end and start <= Mar_2019_end:
          yield visitors[0],(0,distance/5280,int(visitors[1]))
        elif Oct_2019_start <= end and start <= Oct_2019_end:
          yield visitors[0],(1,distance/5280,int(visitors[1]))
        elif Mar_2020_start <= end and start <= Mar_2020_end:
          yield visitors[0],(2,distance/5280,int(visitors[1]))
        elif Oct_2020_start <= end and start <= Oct_2020_end:
          yield visitors[0],(3,distance/5280,int(visitors[1]))

approach_step34 = approach_step2.mapPartitions(visitor_distance)

from functools import reduce

def average_distance(cbgs):
  mar_2019,oct_2019,mar_2020, oct_2020 = [],[],[],[]
  for x in cbgs:
    if x[0]==0:
      mar_2019.append((x[1]*x[2],x[2]))
    elif x[0]==1:
      oct_2019.append((x[1]*x[2],x[2]))
    elif x[0]==2:
      mar_2020.append((x[1]*x[2],x[2]))
    else:
      oct_2020.append((x[1]*x[2],x[2]))
  if len(mar_2019)>0:
    mar_2019 = reduce(lambda x,y:(x[0]+y[0],x[1]+y[1]),mar_2019)  
    mar_2019_ave = round(mar_2019[0]/mar_2019[1],2)
  else:
    mar_2019_ave = ''
  if len(oct_2019)>0:
    oct_2019 = reduce(lambda x,y:(x[0]+y[0],x[1]+y[1]),oct_2019)  
    oct_2019_ave = round(oct_2019[0]/oct_2019[1],2)
  else:
    oct_2019_ave = ''
  if len(mar_2020)>0:
    mar_2020 = reduce(lambda x,y:(x[0]+y[0],x[1]+y[1]),mar_2020)  
    mar_2020_ave = round(mar_2020[0]/mar_2020[1],2)
  else:
    mar_2020_ave = ''
  if len(oct_2020)>0:
    oct_2020 = reduce(lambda x,y:(x[0]+y[0],x[1]+y[1]),oct_2020)
    oct_2020_ave = round(oct_2020[0]/oct_2020[1],2)
  else:
    oct_2020_ave = ''
  return mar_2019_ave,oct_2019_ave,mar_2020_ave, oct_2020_ave

approach_step5 = approach_step34.groupByKey()\
                                .mapValues(average_distance)\
                                .map(lambda x: (x[0],x[1][0],x[1][1],x[1][2],x[1][3]))

output_df = approach_step5.toDF(['cbg_fips','2019-03','2019-10','2020-03','2020-10'])
output = output_df.rdd
output.saveAsTextFile('gs://bdma/shared/2023_spring/FC/23499013_Jin/BDM_FC_23499013_Jin.csv')


Writing BDM_FC_23499013_Jin.py


In [None]:
%%writefile props.conf
spark.hadoop.fs.gs.requester.pays.mode=AUTO
spark.hadoop.fs.gs.requester.pays.project.id=bda-12345
spark.executor.instances='4'
spark.executor.cores='4'

Writing props.conf


In [None]:
import time
start_time = time.time()

!gcloud dataproc jobs submit pyspark --cluster bda-final-challenge --files nyc_supermarkets.csv,nyc_cbg_centroids.csv \
--properties-file=props.conf BDM_FC_23499013_Jin.py -- gs://bdma/shared/2023_spring/FC/23499013_Jin

print(" %s second " %(time.time()-start_time))

Job [af12ee99b10d4820aa513c13755c6c56] submitted.
Waiting for job output...
23/05/19 01:15:28 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
23/05/19 01:15:28 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
23/05/19 01:15:28 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
23/05/19 01:15:28 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
23/05/19 01:15:28 INFO org.sparkproject.jetty.util.log: Logging initialized @6130ms to org.sparkproject.jetty.util.log.Slf4jLog
23/05/19 01:15:29 INFO org.sparkproject.jetty.server.Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_362-b09
23/05/19 01:15:29 INFO org.sparkproject.jetty.server.Server: Started @6278ms
23/05/19 01:15:29 INFO org.sparkproject.jetty.server.AbstractConnector: Started ServerConnector@507b0f68{HTTP/1.1, (http/1.1)}{0.0.0.0:34815}
23/05/19 01:15:29 INFO org.apache.hadoop.yarn.client.RMPro

In [None]:
!gsutil -u bda-12345 cp -r gs://bdma/shared/2023_spring/FC/23499013_Jin/BDM_FC_23499013_Jin.csv .

Copying gs://bdma/shared/2023_spring/FC/23499013_Jin/BDM_FC_23499013_Jin.csv/_SUCCESS...
/ [0 files][    0.0 B/    0.0 B]                                                / [1 files][    0.0 B/    0.0 B]                                                Copying gs://bdma/shared/2023_spring/FC/23499013_Jin/BDM_FC_23499013_Jin.csv/part-00000...
Copying gs://bdma/shared/2023_spring/FC/23499013_Jin/BDM_FC_23499013_Jin.csv/part-00001...
/ [3 files][ 62.2 KiB/ 62.2 KiB]                                                
==> NOTE: You are performing a sequence of gsutil operations that may
run significantly faster if you instead use gsutil -m cp ... Please
see the -m section under "gsutil help options" for further information
about when gsutil -m can be advantageous.

Copying gs://bdma/shared/2023_spring/FC/23499013_Jin/BDM_FC_23499013_Jin.csv/part-00002...
Copying gs://bdma/shared/2023_spring/FC/23499013_Jin/BDM_FC_23499013_Jin.csv/part-00003...
Copying gs://bdma/shared/2023_spring/FC/23499013_Jin

In [None]:
weekly_pattern_csv = 'BDM_FC_23499013_Jin.csv'
weekly_pattern0 = sc.textFile(weekly_pattern_csv, use_unicode=True).cache()

In [None]:
weekly_pattern0.take(15)

["Row(cbg_fips='360810661001', 2019-03=1.35, 2019-10=2.63, 2020-03=1.76, 2020-10=1.95)",
 "Row(cbg_fips='360810663001', 2019-03=1.08, 2019-10=2.13, 2020-03=0.72, 2020-10=0.86)",
 "Row(cbg_fips='360810663002', 2019-03=0.75, 2019-10=2.0, 2020-03=2.08, 2020-10=2.1)",
 "Row(cbg_fips='360810663003', 2019-03=1.08, 2019-10=2.92, 2020-03=1.67, 2020-10=1.87)",
 "Row(cbg_fips='360810664001', 2019-03=10.13, 2019-10=10.13, 2020-03=4.37, 2020-10=14.16)",
 "Row(cbg_fips='360810664002', 2019-03=7.14, 2019-10=7.03, 2020-03=8.86, 2020-10=10.96)",
 "Row(cbg_fips='360810664003', 2019-03=4.27, 2019-10=1.79, 2020-03=6.1, 2020-10=None)",
 "Row(cbg_fips='360810664004', 2019-03=4.85, 2019-10=8.44, 2020-03=6.96, 2020-10=5.87)",
 "Row(cbg_fips='360810664005', 2019-03=7.07, 2019-10=7.12, 2020-03=5.06, 2020-10=1.7)",
 "Row(cbg_fips='360810664006', 2019-03=10.33, 2019-10=11.42, 2020-03=5.34, 2020-10=7.69)",
 "Row(cbg_fips='360810664007', 2019-03=7.58, 2019-10=5.9, 2020-03=7.9, 2020-10=8.79)",
 "Row(cbg_fips='36081

In [None]:
weekly_pattern0.count()

6272