In [1]:
import pandas as pd
import numpy as np

In [3]:
pdf = pd.DataFrame.from_items([('A', [1, 2, 3]), ('B', [4, 5, 6])])
pdf

Unnamed: 0,A,B
0,1,4
1,2,5
2,3,6


In [4]:
pdf.A

0    1
1    2
2    3
Name: A, dtype: int64

In [6]:
df = spark.createDataFrame([(1, 4), (2, 5), (3, 6)], ["A", "B"])
df.show()                           

+---+---+
|  A|  B|
+---+---+
|  1|  4|
|  2|  5|
|  3|  6|
+---+---+



In [10]:
print(pdf.A)
print(pdf['A'])
print(df.A)
print(df['A'])

0    1
1    2
2    3
Name: A, dtype: int64
0    1
1    2
2    3
Name: A, dtype: int64
Column<b'A'>
Column<b'A'>


In [11]:
pdf['C'] = 0
pdf

Unnamed: 0,A,B,C
0,1,4,0
1,2,5,0
2,3,6,0


In [14]:
df = df.withColumn('C', 0)

AssertionError: col should be Column

In [13]:
from pyspark.sql import functions as F
df = df.withColumn('C', F.lit(0))
df.show()

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  1|  4|  0|
|  2|  5|  0|
|  3|  6|  0|
+---+---+---+



In [15]:
df.withColumn('C', df.B > 0).show()

+---+---+----+
|  A|  B|   C|
+---+---+----+
|  1|  4|true|
|  2|  5|true|
|  3|  6|true|
+---+---+----+



In [16]:
df.withColumn('D', df.A * df.B).show()

+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|  1|  4|  0|  4|
|  2|  5|  0| 10|
|  3|  6|  0| 18|
+---+---+---+---+



In [19]:
df.select(df.C > 0).show()

+-------+
|(C > 0)|
+-------+
|  false|
|  false|
|  false|
+-------+



In [21]:
df.select('B' > 0).show()

TypeError: unorderable types: str() > int()

In [22]:
df.select(df['B'] > 0).show()

+-------+
|(B > 0)|
+-------+
|   true|
|   true|
|   true|
+-------+



In [25]:
df.select((df.B > 0).alias("is_positive")).show()

+-----------+
|is_positive|
+-----------+
|       true|
|       true|
|       true|
+-----------+



In [27]:
pdf[(pdf.A > 1) & (pdf.B > 4)]

Unnamed: 0,A,B,C
1,2,5,0
2,3,6,0


In [30]:
df.filter((df.A > 1) & (df.B > 4)).show()

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  2|  5|  0|
|  3|  6|  0|
+---+---+---+



In [33]:
df[(df.A > 1) & (df.B > 4)].show()

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  2|  5|  0|
|  3|  6|  0|
+---+---+---+



In [34]:
spark.createDataFrame(pdf).show()

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  1|  4|  0|
|  2|  5|  0|
|  3|  6|  0|
+---+---+---+



In [36]:
df.groupBy('A').avg('B').show()

+---+------+
|  A|avg(B)|
+---+------+
|  1|   4.0|
|  3|   6.0|
|  2|   5.0|
+---+------+



In [42]:
pdf.groupby('A')['B'].mean()

A
1    4
2    5
3    6
Name: B, dtype: int64

In [48]:
df.groupby('A').avg('B').describe().show()

+-------+---+------+
|summary|  A|avg(B)|
+-------+---+------+
|  count|  3|     3|
|   mean|2.0|   5.0|
| stddev|1.0|   1.0|
|    min|  1|   4.0|
|    max|  3|   6.0|
+-------+---+------+



In [49]:
df.groupBy('A').agg(F.count('B'), F.max('B'), F.min('C')).show()

+---+--------+------+------+
|  A|count(B)|max(B)|min(C)|
+---+--------+------+------+
|  1|       1|     4|     0|
|  3|       1|     6|     0|
|  2|       1|     5|     0|
+---+--------+------+------+



In [60]:
pdf.groupby('A')['B'].agg(['count', 'max','mean']) 

Unnamed: 0_level_0,count,max,mean
A,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1,1,4,4
2,1,5,5
3,1,6,6


In [64]:
pdf.groupby('A').agg({'B':'count', 'C':'min'})

Unnamed: 0_level_0,B,C
A,Unnamed: 1_level_1,Unnamed: 2_level_1
1,1,0
2,1,0
3,1,0


In [65]:
df = sqlCtx.createDataFrame([(1, 4), (1, 5), (2, 6), (2, 6), (3, 0)], ["A", "B"])
pdf = df.toPandas()
pdf

Unnamed: 0,A,B
0,1,4
1,1,5
2,2,6
3,2,6
4,3,0


In [66]:
pdf['diff'] = pdf.B.diff()
pdf

Unnamed: 0,A,B,diff
0,1,4,
1,1,5,1.0
2,2,6,1.0
3,2,6,0.0
4,3,0,-6.0


In [67]:
from pyspark.sql import Window

window_over_A = Window.partitionBy('A').orderBy('B')
df.withColumn('diff', F.lead('B').over(window_over_A) - df.B).show()

+---+---+----+
|  A|  B|diff|
+---+---+----+
|  1|  4|   1|
|  1|  5|null|
|  3|  0|null|
|  2|  6|   0|
|  2|  6|null|
+---+---+----+



In [3]:

import sys
import types
import pandas as pd
from botocore.client import Config
import ibm_boto3

def __iter__(self): return 0

# @hidden_cell
# The following code accesses a file in your IBM Cloud Object Storage. It includes your credentials.
# You might want to remove those credentials before you share your notebook.
client_9fa254825cf748af8c99d9684609cb33 = ibm_boto3.client(service_name='s3',
    ibm_api_key_id='pGtcK4Cfe0bYhkzj57xpd52SoW35u9AminIgvd2fgQxN',
    ibm_auth_endpoint="https://iam.ng.bluemix.net/oidc/token",
    config=Config(signature_version='oauth'),
    endpoint_url='https://s3-api.us-geo.objectstorage.service.networklayer.com')

body = client_9fa254825cf748af8c99d9684609cb33.get_object(Bucket='sparkscalapythonplayground-donotdelete-pr-pmmafeibz2hwiv',Key='P00000001-ALL.csv')['Body']
# add missing __iter__ method, so pandas accepts body as file-like object
if not hasattr(body, "__iter__"): body.__iter__ = types.MethodType( __iter__, body )

p_pdf = pd.read_csv(body)
p_pdf.head()



  interactivity=interactivity, compiler=compiler, result=result)


Unnamed: 0,cmte_id,cand_id,cand_nm,contbr_nm,contbr_city,contbr_st,contbr_zip,contbr_employer,contbr_occupation,contb_receipt_amt,contb_receipt_dt,receipt_desc,memo_cd,memo_text,form_tp,file_num
0,C00410118,P20002978,"Bachmann, Michelle","HARVEY, WILLIAM",MOBILE,AL,366010000.0,RETIRED,RETIRED,250.0,20-JUN-11,,,,SA17A,736166
1,C00410118,P20002978,"Bachmann, Michelle","HARVEY, WILLIAM",MOBILE,AL,366010000.0,RETIRED,RETIRED,50.0,23-JUN-11,,,,SA17A,736166
2,C00410118,P20002978,"Bachmann, Michelle","SMITH, LANIER",LANETT,AL,368633000.0,INFORMATION REQUESTED,INFORMATION REQUESTED,250.0,05-JUL-11,,,,SA17A,749073
3,C00410118,P20002978,"Bachmann, Michelle","BLEVINS, DARONDA",PIGGOTT,AR,724548000.0,NONE,RETIRED,250.0,01-AUG-11,,,,SA17A,749073
4,C00410118,P20002978,"Bachmann, Michelle","WARDENBURG, HAROLD",HOT SPRINGS NATION,AR,719016000.0,NONE,RETIRED,300.0,20-JUN-11,,,,SA17A,736166


In [4]:

import ibmos2spark

# @hidden_cell
credentials = {
    'endpoint': 'https://s3-api.us-geo.objectstorage.service.networklayer.com',
    'api_key': 'pGtcK4Cfe0bYhkzj57xpd52SoW35u9AminIgvd2fgQxN',
    'service_id': 'iam-ServiceId-8270013a-9afb-4c42-9b56-f62850a9c0f4',
    'iam_service_endpoint': 'https://iam.ng.bluemix.net/oidc/token'}

configuration_name = 'os_9fa254825cf748af8c99d9684609cb33_configs'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
p_df = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(cos.url('P00000001-ALL.csv', 'sparkscalapythonplayground-donotdelete-pr-pmmafeibz2hwiv'))
p_df.take(5)


[Row(cmte_id='C00410118', cand_id='P20002978', cand_nm='Bachmann, Michelle', contbr_nm='HARVEY, WILLIAM', contbr_city='MOBILE', contbr_st='AL', contbr_zip='366010290', contbr_employer='RETIRED', contbr_occupation='RETIRED', contb_receipt_amt='250', contb_receipt_dt='20-JUN-11', receipt_desc=None, memo_cd=None, memo_text=None, form_tp='SA17A', file_num='736166'),
 Row(cmte_id='C00410118', cand_id='P20002978', cand_nm='Bachmann, Michelle', contbr_nm='HARVEY, WILLIAM', contbr_city='MOBILE', contbr_st='AL', contbr_zip='366010290', contbr_employer='RETIRED', contbr_occupation='RETIRED', contb_receipt_amt='50', contb_receipt_dt='23-JUN-11', receipt_desc=None, memo_cd=None, memo_text=None, form_tp='SA17A', file_num='736166'),
 Row(cmte_id='C00410118', cand_id='P20002978', cand_nm='Bachmann, Michelle', contbr_nm='SMITH, LANIER', contbr_city='LANETT', contbr_st='AL', contbr_zip='368633403', contbr_employer='INFORMATION REQUESTED', contbr_occupation='INFORMATION REQUESTED', contb_receipt_amt='25

In [5]:
p_df[p_df.cmte_id == 'C00410118'].count()

411

# Cast contb_receipt_amt column to double type

In [6]:
from pyspark.sql.types import DoubleType
p_df_cast = p_df.withColumn('contb_receipt_amt_num', p_df.contb_receipt_amt.cast(DoubleType())).drop('contb_receipt_amt')
p_df_cast.show(5)

+---------+---------+------------------+------------------+------------------+---------+----------+--------------------+--------------------+----------------+------------+-------+---------+-------+--------+---------------------+
|  cmte_id|  cand_id|           cand_nm|         contbr_nm|       contbr_city|contbr_st|contbr_zip|     contbr_employer|   contbr_occupation|contb_receipt_dt|receipt_desc|memo_cd|memo_text|form_tp|file_num|contb_receipt_amt_num|
+---------+---------+------------------+------------------+------------------+---------+----------+--------------------+--------------------+----------------+------------+-------+---------+-------+--------+---------------------+
|C00410118|P20002978|Bachmann, Michelle|   HARVEY, WILLIAM|            MOBILE|       AL| 366010290|             RETIRED|             RETIRED|       20-JUN-11|        null|   null|     null|  SA17A|  736166|                250.0|
|C00410118|P20002978|Bachmann, Michelle|   HARVEY, WILLIAM|            MOBILE|      

# Which candidate received the highest amount as contribution

In [61]:
summed = p_pdf[['cand_nm', 'contb_receipt_amt']].groupby('cand_nm').sum()
summed.loc[summed.idxmax()]

Index(['contb_receipt_amt'], dtype='object')

In [76]:
from pyspark.sql import functions as F
p_df_cast_summed = p_df_cast[['cand_nm', 'contb_receipt_amt_num']].groupBy('cand_nm').agg(F.sum('contb_receipt_amt_num').alias('Total'))
max_contrib = p_df_cast_summed.withColumn('tempid', F.lit(0)).groupBy('tempid').agg(F.max('Total').alias('Max Contrib')).select('Max Contrib')

['cand_nm', 'Total']


In [87]:
#print(max_contrib.first()[0])
p_df_cast_summed.select('cand_nm', 'Total').where(p_df_cast_summed.Total == max_contrib.first()[0]).show()

+-------------+--------------------+
|      cand_nm|               Total|
+-------------+--------------------+
|Obama, Barack|1.3350228644999972E8|
+-------------+--------------------+



# Which city contributed the most to Barack Obama

In [58]:
p_pdf_summed = p_pdf[p_pdf.cand_nm == 'Obama, Barack'][['contbr_city', 'contb_receipt_amt']].groupby('contbr_city').sum()
p_pdf_summed.loc[p_pdf_summed.idxmax()]

Unnamed: 0_level_0,contb_receipt_amt
contbr_city,Unnamed: 1_level_1
CHICAGO,12250782.12


In [17]:
from pyspark.sql import functions as F
city_contribs = p_df_cast[p_df_cast.cand_nm == 'Obama, Barack'][['contbr_city', 'contb_receipt_amt_num']].groupBy('contbr_city').agg(F.sum('contb_receipt_amt_num').alias('Total'))
city_contribs.cache()

In [94]:
city_max_contri = city_contribs.withColumn('tempId', F.lit(0)).groupBy('tempId').agg(F.max('Total').alias('Max Contribution')).select('Max Contribution')

+------+--------------------+
|tempId|    Max Contribution|
+------+--------------------+
|     0|1.2250782119999992E7|
+------+--------------------+



In [96]:
city_contribs.select(city_contribs.contbr_city, city_contribs.Total).where(city_contribs.Total == city_max_contri.first()[0]).show()

+--------------+-----+
|   contbr_city|Total|
+--------------+-----+
|  BELLEAIR BLF|  0.0|
|      RIDGELEY|  0.0|
|EGGHARBOR CITY|  0.0|
|      LONGBOAT|  0.0|
|      AMANHEIM|  0.0|
|   LITTLE YORK|  0.0|
|EARLBANKS1935@|  0.0|
|      FISHTAIL|  0.0|
|       ANDENNE|  0.0|
|          DREW|  0.0|
|       BALDWYN|  0.0|
|       BIDWELL|  0.0|
|     CHALMETTE|  0.0|
+--------------+-----+

