In [1]:
from pyspark import SparkContext, SparkConf

# prepare the RDD
data_file = "./startup_funding.csv"

conf = SparkConf().setAppName("StartupFunding")
sc = SparkContext(conf=conf)

raw_data = sc.textFile(data_file)


In [2]:
print "Total records in Raw data = {}".format(raw_data.count())

Total records in Raw data = 2373


In [3]:
header = raw_data.first()
header = sc.parallelize([header])

In [4]:
clean_data = raw_data.subtract(header)

In [5]:
print "Total records in Clean data = {}".format(clean_data.count())

Total records in Clean data = 2372


In [6]:
clean_data

PythonRDD[13] at RDD at PythonRDD.scala:48

In [7]:
csv = clean_data.map(lambda x: x.split(','))

In [8]:
csv.count()

2372

In [9]:
cities = csv.map(lambda x: x[5]).collect()

In [10]:
amounts = csv.map(lambda x: x[8]).collect()

In [31]:
amounts_by_cities = csv.map(lambda x: (x[5], float(x[8])))

amounts_by_cities.take(5)


[(u'Jaipur', 1000000.0),
 (u'Noida', 500000.0),
 (u'Bangalore', 12300000.0),
 (u'New Delhi', 200000.0),
 (u'Mumbai', 1000000.0)]

In [32]:
total_amounts_by_cities = amounts_by_cities.reduceByKey(lambda x,y : float(x) + float(y))

In [33]:
total_amounts_by_cities.collect()

[(u'USA/India', 16600000.0),
 (u'Gurgaon', 2069021500.0),
 (u'Noida', 170638000.0),
 (u'Lucknow', 1000000.0),
 (u'India / US', 30000000.0),
 (u'Pune', 366653000.0),
 (u'Vadodara', 6040000.0),
 (u'Kerala', 27000.0),
 (u'Kolkata', 13865000.0),
 (u'Hyderabad', 196362000.0),
 (u'Hubli', 0.0),
 (u'Mumbai', 2354934500.0),
 (u'Unknown', 1271863868.0),
 (u'Missourie', 350000.0),
 (u'Panaji', 825000.0),
 (u'Indore', 1672000.0),
 (u'Surat', 0.0),
 (u'Goa', 2380000.0),
 (u'Gwalior', 900000.0),
 (u'Belgaum', 500000.0),
 (u'US/India', 3000000.0),
 (u'Trivandrum', 100000.0),
 (u'Bhopal', 1900000.0),
 (u'Siliguri', 0.0),
 (u'Udupi', 12000000.0),
 (u'bangalore', 0.0),
 (u'Karur', 0.0),
 (u'Coimbatore', 1650000.0),
 (u'Chennai', 411105000.0),
 (u'Kanpur', 220000.0),
 (u'Singapore', 3850000.0),
 (u'Boston', 3000000.0),
 (u'Varanasi', 52000.0),
 (u'Nagpur', 0.0),
 (u'Goa/Hyderabad', 0.0),
 (u'Udaipur', 0.0),
 (u'Agra', 0.0),
 (u'USA', 0.0),
 (u'New York/ India', 2950000.0),
 (u'US', 0.0),
 (u'London', 0.

In [34]:
type(total_amounts_by_cities)

pyspark.rdd.PipelinedRDD

In [35]:
x_cities = total_amounts_by_cities.map(lambda x: x[0]).collect()
x_cities

[u'USA/India',
 u'Gurgaon',
 u'Noida',
 u'Lucknow',
 u'India / US',
 u'Pune',
 u'Vadodara',
 u'Kerala',
 u'Kolkata',
 u'Hyderabad',
 u'Hubli',
 u'Mumbai',
 u'Unknown',
 u'Missourie',
 u'Panaji',
 u'Indore',
 u'Surat',
 u'Goa',
 u'Gwalior',
 u'Belgaum',
 u'US/India',
 u'Trivandrum',
 u'Bhopal',
 u'Siliguri',
 u'Udupi',
 u'bangalore',
 u'Karur',
 u'Coimbatore',
 u'Chennai',
 u'Kanpur',
 u'Singapore',
 u'Boston',
 u'Varanasi',
 u'Nagpur',
 u'Goa/Hyderabad',
 u'Udaipur',
 u'Agra',
 u'USA',
 u'New York/ India',
 u'US',
 u'London',
 u'Bangalore',
 u'Ahmedabad',
 u'Chandigarh',
 u'Jodhpur',
 u'New Delhi',
 u'Delhi',
 u'Kozhikode',
 u'Jaipur',
 u'Kochi']

In [36]:
y_amounts = total_amounts_by_cities.map(lambda x: x[1]).collect()
y_amounts

[16600000.0,
 2069021500.0,
 170638000.0,
 1000000.0,
 30000000.0,
 366653000.0,
 6040000.0,
 27000.0,
 13865000.0,
 196362000.0,
 0.0,
 2354934500.0,
 1271863868.0,
 350000.0,
 825000.0,
 1672000.0,
 0.0,
 2380000.0,
 900000.0,
 500000.0,
 3000000.0,
 100000.0,
 1900000.0,
 0.0,
 12000000.0,
 0.0,
 0.0,
 1650000.0,
 411105000.0,
 220000.0,
 3850000.0,
 3000000.0,
 52000.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 2950000.0,
 0.0,
 0.0,
 8425674108.0,
 98186000.0,
 26100000.0,
 160000.0,
 2817247500.0,
 1000000.0,
 0.0,
 35560000.0,
 0.0]

In [38]:
# Plot the city-wise funding received
import matplotlib.pyplot as plt

index = range(len(x_cities))

plt.bar(index, y_amounts)
plt.xticks(index, x_cities, rotation=30, fontsize=6)
plt.xlabel('Cities', fontsize=6)
plt.ylabel('Funding Amount')
plt.title('Funding Amounts by Cities')
 
plt.show()