#### Load the Project Data

In [0]:
## Preparing the Environment containing all the required Pyspark & Pandas dataframes, temporary tables 

In [0]:
%run ./Chinook_Load_data_Load



initialising the spark session


Loading the dataframes


In [0]:
from pyspark.sql.functions import *
from pyspark.sql import Row
import pandas as pd
import datetime
# pd.set_options("display.max_rows", None)
pd.set_option("display.max_columns", None) # this option enables to show all the columns in a single frame

Loading the dataframes into the temp tables


below temp tables are available to use
Out[11]: ['album',
 'artist',
 'customers',
 'employees',
 'genre',
 'inline',
 'invoice',
 'medtype',
 'playlist',
 'playtrack',
 'track']

below dataframes are available to use
Out[12]: ['pspk_df_album',
 'pspk_df_artist',
 'pspk_df_cust',
 'pspk_df_emp',
 'pspk_df_genre',
 'pspk_df_invoice',
 'pspk_df_inline',
 'pspk_df_medtype',
 'pspk_df_playlist',
 'pspk_df_playtrack',
 'pspk_df_track']

available python dataframes to use
Variable            Type         Data/Info
------------------------------------------
pd_df_album         DataFrame         AlbumId             <...>n\n[347 rows x 3 columns]
pd_df_artist        DataFrame         ArtistId            <...>n\n[275 rows x 2 columns]
pd_df_cust          DataFrame        CustomerId  FirstName<...>@yahoo.in             3  
pd_df_emp           DataFrame       EmployeeId  LastName F<...>  laura@chinookcorp.com  
pd_df_genre         DataFrame        GenreId              <...>   25               Opera
pd_df_inline        DataFrame          InvoiceLineId  Invo<...>\n[2240 rows x 5 columns]
pd_df_invoice       DataFrame         InvoiceId  CustomerI<...>n\n[412 rows x 9 columns]
pd_df_medtype       DataFrame       MediaTypeId           <...>           AAC audio file
pd_df_playlist      DataFrame        PlaylistId           <...>              On-The-Go 1
pd_df_playtrack     DataFrame          PlaylistId  TrackId<...>\n[5000 rows x 

## Questions to solve

#### 1. Provide a query showing Customers (just their full names, customer ID and country) who are not in the US.

##### SQL

In [0]:
# we can use the customers table to query the customers fullName(firstName + lastName), CustomerId and Country which contains a simple SQL select, from, where statements

In [0]:
%sql
select firstname || " " || lastname custFullName, customerId, country
from customers
where country != 'USA'

custFullName,customerId,country
Luís Gonçalves,1,Brazil
Leonie Köhler,2,Germany
François Tremblay,3,Canada
Bjørn Hansen,4,Norway
František Wichterlová,5,Czech Republic
Helena Holý,6,Czech Republic
Astrid Gruber,7,Austria
Daan Peeters,8,Belgium
Kara Nielsen,9,Denmark
Eduardo Martins,10,Brazil


In [0]:
## Aggregated Query to check if we are getting the same results or not 

In [0]:
%sql
with tb1 as (
  select firstname || " " || lastname custFullName, customerId, country
  from customers
  where country != 'USA'
) select country, count(1) totalCounts from tb1 
group by country
order by totalCounts desc


country,totalCounts
Canada,8
France,5
Brazil,5
Germany,4
United Kingdom,3
India,2
Czech Republic,2
Portugal,2
Sweden,1
Argentina,1


##### Pyspark

In [0]:
## SQL equivalnet for select, where with Pyspark are filter, selectExpr, select

In [0]:
display(
    pspk_df_cust
    .filter("country != 'USA'")
    .selectExpr("concat(firstname,' ',lastname) as custFullName", "customerId", "country")
)

custFullName,customerId,country
Luís Gonçalves,1,Brazil
Leonie Köhler,2,Germany
François Tremblay,3,Canada
Bjørn Hansen,4,Norway
František Wichterlová,5,Czech Republic
Helena Holý,6,Czech Republic
Astrid Gruber,7,Austria
Daan Peeters,8,Belgium
Kara Nielsen,9,Denmark
Eduardo Martins,10,Brazil


In [0]:
display(
    pspk_df_cust
    .filter("country != 'USA'")
    .selectExpr("concat(firstname,' ',lastname) as custFullName", "customerId", "country")
    .groupBy("country")
    .count()
    .orderBy(desc("count"))
)

country,count
Canada,8
France,5
Brazil,5
Germany,4
United Kingdom,3
India,2
Czech Republic,2
Portugal,2
Sweden,1
Argentina,1


##### Python

In [0]:
pd_df_cust_filt = pd_df_cust[pd_df_cust['Country'] != "USA"]
pd_df_cust_filt['custFullName'] = pd_df_cust_filt['FirstName'] +" " +pd_df_cust_filt['LastName']
pd_df_cust_filt_final = pd_df_cust_filt[["custFullName", "CustomerId", "Country"]].reset_index(drop=True)
pd_df_cust_filt_final

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  pd_df_cust_filt['custFullName'] = pd_df_cust_filt['FirstName'] +" " +pd_df_cust_filt['LastName']


Unnamed: 0,custFullName,CustomerId,Country
0,Luís Gonçalves,1,Brazil
1,Leonie Köhler,2,Germany
2,François Tremblay,3,Canada
3,Bjørn Hansen,4,Norway
4,František Wichterlová,5,Czech Republic
5,Helena Holý,6,Czech Republic
6,Astrid Gruber,7,Austria
7,Daan Peeters,8,Belgium
8,Kara Nielsen,9,Denmark
9,Eduardo Martins,10,Brazil


In [0]:
# pd_df_cust_filt_final.groupby("Country")['Country'].count().to_frame()
(
    pd_df_cust_filt_final
    .groupby("Country")
    .agg(counts=("Country", "count"))
    .sort_values(by=['counts'], ascending=False)
    .reset_index()
)


Unnamed: 0,Country,counts
0,Canada,8
1,Brazil,5
2,France,5
3,Germany,4
4,United Kingdom,3
5,Czech Republic,2
6,Portugal,2
7,India,2
8,Belgium,1
9,Italy,1


In [0]:
# using .query to get the same results as above for pandas dataframe

In [0]:
(
    pd_df_cust_filt
    .query("Country != 'USA'")
    .assign(custFullName = pd_df_cust_filt["FirstName"] +" " +pd_df_cust_filt["LastName"])
    [["custFullName", "CustomerId", "Country"]]
)


Unnamed: 0,custFullName,CustomerId,Country
0,Luís Gonçalves,1,Brazil
1,Leonie Köhler,2,Germany
2,François Tremblay,3,Canada
3,Bjørn Hansen,4,Norway
4,František Wichterlová,5,Czech Republic
5,Helena Holý,6,Czech Republic
6,Astrid Gruber,7,Austria
7,Daan Peeters,8,Belgium
8,Kara Nielsen,9,Denmark
9,Eduardo Martins,10,Brazil


##### RDDs

In [0]:
## Creating a RDDs.

In [0]:
pspk_rdd_cust = pspk_df_cust.rdd

In [0]:
## To get the same results as SQL while using RDD and as RDD allows to work on each row,first we need to filter out the rows and later to concat firstName & lastName we can use map function.

In [0]:
(
    pspk_rdd_cust
    .filter(lambda x: x['Country'] != 'USA')
    .map(lambda row: (row['FirstName']+" "+row["LastName"], row['CustomerId'], row['Country']))
    # .take(2)
    .collect()
)

Out[31]: [('Luís Gonçalves', 1, 'Brazil'),
 ('Leonie Köhler', 2, 'Germany'),
 ('François Tremblay', 3, 'Canada'),
 ('Bjørn Hansen', 4, 'Norway'),
 ('František Wichterlová', 5, 'Czech Republic'),
 ('Helena Holý', 6, 'Czech Republic'),
 ('Astrid Gruber', 7, 'Austria'),
 ('Daan Peeters', 8, 'Belgium'),
 ('Kara Nielsen', 9, 'Denmark'),
 ('Eduardo Martins', 10, 'Brazil'),
 ('Alexandre Rocha', 11, 'Brazil'),
 ('Roberto Almeida', 12, 'Brazil'),
 ('Fernanda Ramos', 13, 'Brazil'),
 ('Mark Philips', 14, 'Canada'),
 ('Jennifer Peterson', 15, 'Canada'),
 ('Robert Brown', 29, 'Canada'),
 ('Edward Francis', 30, 'Canada'),
 ('Martha Silk', 31, 'Canada'),
 ('Aaron Mitchell', 32, 'Canada'),
 ('Ellie Sullivan', 33, 'Canada'),
 ('João Fernandes', 34, 'Portugal'),
 ('Madalena Sampaio', 35, 'Portugal'),
 ('Hannah Schneider', 36, 'Germany'),
 ('Fynn Zimmermann', 37, 'Germany'),
 ('Niklas Schröder', 38, 'Germany'),
 ('Camille Bernard', 39, 'France'),
 ('Dominique Lefebvre', 40, 'France'),
 ('Marc Dubois', 41

In [0]:
## to validate the results we can use RDDs key-value pairs to check if the values againist each country(counts) are matching or not

In [0]:
display(
    pspk_rdd_cust
    .filter(lambda x: x['Country'] != 'USA')
    .map(lambda row: (row['FirstName']+" "+row["LastName"], row['CustomerId'], row['Country']))
    .map(lambda row: (row[2], 1))
    .reduceByKey(lambda x,y: x+y)
    .sortBy(lambda x: x[1], ascending=False)
    .collect()
    # .toDF()
)

_1,_2
Canada,8
Brazil,5
France,5
Germany,4
United Kingdom,3
Czech Republic,2
Portugal,2
India,2
Norway,1
Austria,1


#### 2. Provide a query showing the Invoices of customers who are from Brazil. The resultant table should show the customer's full name, Invoice ID, Date of the invoice and billing country.

In [0]:
## In above query we need to join multiple tables to get the required output, and in SQL we can join 2 tables and apply required filters to get the expected output.

##### SQL

In [0]:
%sql

select customers.FirstName || " " || customers.LastName custFullName, invoice.InvoiceId, invoice.InvoiceDate, invoice.BillingCountry
from customers 
  inner join invoice on (customers.CustomerId = invoice.CustomerId)
  where invoice.BillingCountry = "Brazil"

custFullName,InvoiceId,InvoiceDate,BillingCountry
Eduardo Martins,25,2009-04-09T00:00:00.000+0000,Brazil
Roberto Almeida,34,2009-05-23T00:00:00.000+0000,Brazil
Fernanda Ramos,35,2009-06-05T00:00:00.000+0000,Brazil
Alexandre Rocha,57,2009-09-06T00:00:00.000+0000,Brazil
Fernanda Ramos,58,2009-09-07T00:00:00.000+0000,Brazil
Alexandre Rocha,68,2009-10-17T00:00:00.000+0000,Brazil
Fernanda Ramos,80,2009-12-10T00:00:00.000+0000,Brazil
Luís Gonçalves,98,2010-03-11T00:00:00.000+0000,Brazil
Luís Gonçalves,121,2010-06-13T00:00:00.000+0000,Brazil
Alexandre Rocha,123,2010-06-17T00:00:00.000+0000,Brazil


In [0]:
%sql

select invoice.BillingCountry, count(invoice.BillingCountry) totalCounts
from customers 
  inner join invoice on (customers.CustomerId = invoice.CustomerId)
  where invoice.BillingCountry = "Brazil"
group by invoice.BillingCountry

BillingCountry,totalCounts
Brazil,35


##### Pyspark

In [0]:
## InPyspark the logic is almost similar we can use alias to the dataframes and then join it with other dataframes as show below and we can use sequence of chained statements to get the results

In [0]:
display(
    pspk_df_cust.alias("cust")
    .join(pspk_df_invoice.alias("inv"), on=['CustomerId'], how='inner')
    .filter("inv.BillingCountry = 'Brazil'")
    .selectExpr("cust.FirstName||' '||cust.LastName custFullName", "inv.invoiceId", "inv.invoiceDate", "inv.BillingCountry")
)

custFullName,invoiceId,invoiceDate,BillingCountry
Eduardo Martins,25,2009-04-09T00:00:00.000+0000,Brazil
Roberto Almeida,34,2009-05-23T00:00:00.000+0000,Brazil
Fernanda Ramos,35,2009-06-05T00:00:00.000+0000,Brazil
Alexandre Rocha,57,2009-09-06T00:00:00.000+0000,Brazil
Fernanda Ramos,58,2009-09-07T00:00:00.000+0000,Brazil
Alexandre Rocha,68,2009-10-17T00:00:00.000+0000,Brazil
Fernanda Ramos,80,2009-12-10T00:00:00.000+0000,Brazil
Luís Gonçalves,98,2010-03-11T00:00:00.000+0000,Brazil
Luís Gonçalves,121,2010-06-13T00:00:00.000+0000,Brazil
Alexandre Rocha,123,2010-06-17T00:00:00.000+0000,Brazil


In [0]:
display(
    pspk_df_cust.alias("cust")
    .join(pspk_df_invoice.alias("inv"), on=['CustomerId'], how='inner')
    .filter("inv.BillingCountry = 'Brazil'")
    # .selectExpr("cust.FirstName||' '||cust.LastName custFullName", "inv.invoiceId", "inv.invoiceDate", "inv.BillingCountry")
    .groupBy("inv.BillingCountry")
    .count()
)

BillingCountry,count
Brazil,35


##### Python

In [0]:
## to add a new column we can use assign where a new column is added to the existing dataframe without disturbing the structure
## and the equivalent of join for python is merge

In [0]:
(
    pd_df_cust
    .query("Country == 'Brazil'")
    .merge(pd_df_invoice, on='CustomerId', how='inner')
    .assign(custFullName = pd_df_cust['FirstName']+" "+pd_df_cust['LastName'])
    [['custFullName', 'InvoiceId', 'InvoiceDate', 'BillingCountry']]
)

Unnamed: 0,custFullName,InvoiceId,InvoiceDate,BillingCountry
0,Luís Gonçalves,98,2010-03-11,Brazil
1,Leonie Köhler,121,2010-06-13,Brazil
2,François Tremblay,143,2010-09-15,Brazil
3,Bjørn Hansen,195,2011-05-06,Brazil
4,František Wichterlová,316,2012-10-27,Brazil
5,Helena Holý,327,2012-12-07,Brazil
6,Astrid Gruber,382,2013-08-07,Brazil
7,Daan Peeters,25,2009-04-09,Brazil
8,Kara Nielsen,154,2010-11-14,Brazil
9,Eduardo Martins,177,2011-02-16,Brazil


In [0]:
(
    pd_df_cust
    .query("Country == 'Brazil'")
    .merge(pd_df_invoice, on='CustomerId', how='inner')
    .assign(custFullName = pd_df_cust['FirstName']+" "+pd_df_cust['LastName'])
    [['custFullName', 'InvoiceId', 'InvoiceDate', 'BillingCountry']]
    .groupby("BillingCountry")
    .agg(counts = ("BillingCountry", "count") ) # new column name is "counts" and the function to add for the column BillingCountry is count as show above which is similar to Pyspars groupBy and count
    .reset_index()
)

Unnamed: 0,BillingCountry,counts
0,Brazil,35


##### RDDs

In [0]:
## while for RDDs there from my obsrevation there is no direction way to get the required results. and for Joining Multiple RDDs,the RDD has to be in Key-Value pairs where the keys can be the joining column.

In [0]:
pspk_rdd_cust = pspk_df_cust.rdd
pspk_rdd_inv = pspk_df_invoice.rdd

In [0]:
## creating 2 RDD with similar keys for joining
pspk_rdd_cust_v1 = (
    pspk_rdd_cust
    .filter(lambda x: x['Country'] == 'Brazil')
    .map(lambda row: (row['CustomerId'], row['FirstName']+" " +row['LastName'])  )
)

pspk_rdd_inv_v1 = (
    pspk_rdd_inv
    .map(lambda row: (row['CustomerId'], [row['InvoiceId'], row['InvoiceDate'], row['BillingCountry'] ] ) )
    # .take(1)
)

In [0]:
(
    pspk_rdd_cust_v1
    .take(2)
)

Out[53]: [(1, 'Luís Gonçalves'), (10, 'Eduardo Martins')]

In [0]:
(
    pspk_rdd_inv_v1
    .filter(lambda x: x[0] == 10)
    .take(2)
)

Out[55]: [(10, [25, datetime.datetime(2009, 4, 9, 0, 0), 'Brazil']),
 (10, [154, datetime.datetime(2010, 11, 14, 0, 0), 'Brazil'])]

In [0]:
(
    pspk_rdd_cust_v1
    .join(pspk_rdd_inv_v1)
    # .flatMap(lambda x: x) # on using this function then the resultant key-value pairs are furthur reduced. initially the output has35 key-value pairs and on using flatMap the values are increased to 70
    .collect()
)

Out[70]: [(10,
  ('Eduardo Martins', [25, datetime.datetime(2009, 4, 9, 0, 0), 'Brazil'])),
 (10,
  ('Eduardo Martins', [154, datetime.datetime(2010, 11, 14, 0, 0), 'Brazil'])),
 (10,
  ('Eduardo Martins', [177, datetime.datetime(2011, 2, 16, 0, 0), 'Brazil'])),
 (10,
  ('Eduardo Martins', [199, datetime.datetime(2011, 5, 21, 0, 0), 'Brazil'])),
 (10,
  ('Eduardo Martins', [251, datetime.datetime(2012, 1, 9, 0, 0), 'Brazil'])),
 (10,
  ('Eduardo Martins', [372, datetime.datetime(2013, 7, 2, 0, 0), 'Brazil'])),
 (10,
  ('Eduardo Martins', [383, datetime.datetime(2013, 8, 12, 0, 0), 'Brazil'])),
 (12,
  ('Roberto Almeida', [34, datetime.datetime(2009, 5, 23, 0, 0), 'Brazil'])),
 (12,
  ('Roberto Almeida', [155, datetime.datetime(2010, 11, 14, 0, 0), 'Brazil'])),
 (12,
  ('Roberto Almeida', [166, datetime.datetime(2010, 12, 25, 0, 0), 'Brazil'])),
 (12,
  ('Roberto Almeida', [221, datetime.datetime(2011, 8, 25, 0, 0), 'Brazil'])),
 (12,
  ('Roberto Almeida', [350, datetime.datetime(2013, 

In [0]:
(
    pspk_rdd_cust_v1
    .join(pspk_rdd_inv_v1)
    .map(lambda x: (x[1][1][2], 1))
    .reduceByKey(lambda x,y: x+y)
    .collect()
)

Out[79]: [('Brazil', 35)]

#### 3. Provide a query that shows the invoices associated with each sales agent. The resultant table should include the Sales Agent's full name.

##### SQL

In [0]:
## for this query we need to join 3 tables
# employees -- to get sales agent names
# customers -- invoice are used to get the relationship between them

In [0]:
%sql

select employees.FirstName || " " || employees.LastName salesAgentName,
  invoice.InvoiceId
from employees
  inner join customers on employees.EmployeeId = customers.SupportRepId
  inner join invoice on customers.CustomerId = invoice.CustomerId

salesAgentName,InvoiceId
Jane Peacock,382
Jane Peacock,327
Jane Peacock,316
Jane Peacock,195
Jane Peacock,143
Jane Peacock,121
Jane Peacock,98
Steve Johnson,293
Steve Johnson,241
Steve Johnson,219


In [0]:
%sql

with tb1 as (
  select employees.FirstName || " " || employees.LastName salesAgentName,
    invoice.InvoiceId
  from employees
    inner join customers on employees.EmployeeId = customers.SupportRepId
    inner join invoice on customers.CustomerId = invoice.CustomerId
) select salesAgentName, count(*) total_invoices from tb1
group by salesAgentName
order by salesAgentName

salesAgentName,total_invoices
Jane Peacock,146
Margaret Park,140
Steve Johnson,126


##### Pyspark

In [0]:
display(
    pspk_df_emp.alias("emp")
    .join(pspk_df_cust.alias("cust")).where("emp.EmployeeId == cust.SupportRepId")
    .join(pspk_df_invoice.alias("invoice")).where("invoice.customerId == cust.customerId")
    .selectExpr("emp.FirstName || ' ' || emp.LastName as salesAgentName", "invoice.invoiceId")
)

salesAgentName,invoiceId
Jane Peacock,382
Jane Peacock,327
Jane Peacock,316
Jane Peacock,195
Jane Peacock,143
Jane Peacock,121
Jane Peacock,98
Steve Johnson,293
Steve Johnson,241
Steve Johnson,219


In [0]:
display(
    pspk_df_emp.alias("emp")
    .join(pspk_df_cust.alias("cust")).where("emp.EmployeeId == cust.SupportRepId")
    .join(pspk_df_invoice.alias("invoice")).where("invoice.customerId == cust.customerId")
    .selectExpr("emp.FirstName || ' ' || emp.LastName as salesAgentName", "invoice.invoiceId")
    .groupBy("salesAgentName")
    .count()
)

salesAgentName,count
Jane Peacock,146
Steve Johnson,126
Margaret Park,140


##### Python

In [0]:
(
    pd_df_emp
    .assign(salesAgentName = pd_df_emp['FirstName'] +" " +pd_df_emp['LastName'])
    .merge(pd_df_cust, left_on = 'EmployeeId', right_on = 'SupportRepId', how = 'inner')
    .merge(pd_df_invoice, left_on = 'CustomerId', right_on = 'CustomerId', how = 'inner')
    [['salesAgentName', 'InvoiceId']]
    .reset_index(drop=True)
)


Unnamed: 0,salesAgentName,InvoiceId
0,Jane Peacock,98
1,Jane Peacock,121
2,Jane Peacock,143
3,Jane Peacock,195
4,Jane Peacock,316
...,...,...
407,Steve Johnson,88
408,Steve Johnson,217
409,Steve Johnson,240
410,Steve Johnson,262


In [0]:
(
    pd_df_emp
    .assign(salesAgentName = pd_df_emp['FirstName'] +" " +pd_df_emp['LastName'])
    .merge(pd_df_cust, left_on = 'EmployeeId', right_on = 'SupportRepId', how = 'inner')
    .merge(pd_df_invoice, left_on = 'CustomerId', right_on = 'CustomerId', how = 'inner')
    [['salesAgentName', 'InvoiceId']]
    .groupby("salesAgentName")
    .count()
    .sort_values(by=['InvoiceId'])
    .reset_index()
)


Unnamed: 0,salesAgentName,InvoiceId
0,Steve Johnson,126
1,Margaret Park,140
2,Jane Peacock,146


##### RDDs

In [0]:
pspk_rdd_emp = pspk_df_emp.rdd
pspk_rdd_cust = pspk_df_cust.rdd
pspk_rdd_inv = pspk_df_invoice.rdd

In [0]:
# as we need to join 3 tables we need to create 3 RDDs having similar key-value pairs (keys being the common joining condition) 
pspk_rdd_emp_mod = (
    pspk_rdd_emp
    # .filter(lambda x: x['EmployeeId']==3)
    .map(lambda x: (x['EmployeeId'], x['FirstName'] +" " +x['LastName']) )
)
pspk_rdd_cust_mod = (
    pspk_rdd_cust
    # .filter(lambda x: x['SupportRepId']==3)
    .map(lambda x: (x['SupportRepId'], x['CustomerId']) )
)
pspk_rdd_inv_mod = (
    pspk_rdd_inv
    .map(lambda x: (x['CustomerId'], x['InvoiceId']) )
)

In [0]:
(
    pspk_rdd_emp_mod
    .join(pspk_rdd_cust_mod)
    .map(lambda x: x[1])
    .map(lambda x: (x[1], x[0] ) )
    .join(pspk_rdd_inv_mod)
    .map(lambda x: x[1])
    .countByKey()
    # .take(4)
)

Out[89]: defaultdict(int,
            {'Margaret Park': 140, 'Jane Peacock': 146, 'Steve Johnson': 126})

### 4. How many Invoices were there in 2009 and 2011? What are the respective total sales for each of those years?

#### SQL

In [0]:
%sql

select year(invoicedate) yr, count(*) total_sales
from invoice
where year(invoicedate) in ('2009', '2011')
group by yr

yr,total_sales
2009,83
2011,83


#### Pyspark

In [0]:
display(
    pspk_df_invoice
    .withColumn("invYear", year(col("InvoiceDate")))
    .filter("invYear in ('2009', '2011') ")
    .groupBy("invYear")
    .count()
)

invYear,count
2009,83
2011,83


In [0]:
display(
    pspk_df_invoice
    .withColumn("invYear", year(col("InvoiceDate")))
    .filter("invYear in ('2009', '2011') ")
    .groupBy("invYear")
    .count()
    # .pivot("invYear").sum("count")
)

invYear,count
2009,83
2011,83


#### Python

In [0]:
pd_df_invoice = (
    pd_df_invoice
    .assign(InvYear = pd_df_invoice['InvoiceDate'].dt.year.astype(str))
)

In [0]:
(
    pd_df_invoice[ (pd_df_invoice['InvYear'] == '2009') | (pd_df_invoice['InvYear'] == '2011') ]
    .groupby("InvYear")['InvYear']
    .count()
)

Out[94]: InvYear
2009    83
2011    83
Name: InvYear, dtype: int64

#### RDDs

In [0]:
def parseTimeToDate(inDt):
    dt = datetime.datetime.strftime(inDt, "%Y")
    return dt

parseTimeToDate(datetime.datetime(2009, 1, 1, 0, 0))

Out[95]: '2009'

In [0]:
pspk_df_invoice_rdd = pspk_df_invoice.rdd

In [0]:
(
    pspk_df_invoice_rdd
    .map(lambda x: (parseTimeToDate(x['InvoiceDate']), 1))
    .reduceByKey(lambda x,y: x+y)
    .filter(lambda x: x[0] in ['2009', '2011'] )
    .collect()
)

Out[97]: [('2009', 83), ('2011', 83)]