## Kiva Crowdfunding Queries with Apache Spark
#### Kaggle's competition
*[...] In Kaggle Datasets' inaugural Data Science for Good challenge, Kiva is inviting the Kaggle community to help them build more localized models to estimate the poverty levels of residents in the regions where Kiva has active loans. Unlike traditional machine learning competitions with rigid evaluation criteria, participants will develop their own creative approaches to addressing the objective. Instead of making a prediction file as in a supervised machine learning problem, submissions in this challenge will take the form of Python and/or R data analyses using Kernels, Kaggle's hosted Jupyter Notebooks-based workbench.*

[More Information](https://www.kaggle.com/kiva/data-science-for-good-kiva-crowdfunding)


This project consists of 4 csv files. You can find files [here](https://www.kaggle.com/kiva/data-science-for-good-kiva-crowdfunding/data) and info about columns [here](https://www.kaggle.com/kiva/data-science-for-good-kiva-crowdfunding/discussion/50585)! The concept behind this is to analyze data in depth, as much is possible! A lot of queries, trying to extract useful insights! 


##### Tech issues.
Kaggle provides data in csv file format, however Apache Spark 2.2.0 have some issues with null values while reading csv files! Furthermore, the schema of the dataset once is loaded, consists of String Type values. Changing schema StructType and fields, did not overcome the obstacles. So, I had to convert csv files to parquet ("*Apache Parquet is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model or programming language.*"). There are two functions below, that converts csv's to parquet files!



###### This project have been developed in Databricks-notebook, using Databricks cluster (Free 6GB Memory, 0.88 Cores, 1 DBU)

In [2]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('kiva').getOrCreate()


In [3]:
#kiva_loans = spark.read.csv('/FileStore/tables/kiva_loans.csv', inferSchema = True, header = True)
#kiva_regions = spark.read.csv('/FileStore/tables/kiva_mpi_region_locations.csv', inferSchema = True, header = True)
#loan_themes = spark.read.csv('/FileStore/tables/loan_theme_ids.csv', inferSchema = True, header = True)
#themes_region = spark.read.csv('/FileStore/tables/loan_themes_by_region.csv', inferSchema = True, header = True)

In [4]:
import pandas as pd
import re
# I/O
# csv: the file to be converted
#parquet: the file name of parquet file

def Parquer1(csv, parquet):
    
    # load csv file
    file = pd.read_csv(csv)
    # save to parquet
    file.to_parquet(parquet)
    
    return("Conversion Done! Ready to load '.parquet' files...")

# replace white spaces in column names
def Parquer2(csv, parquet):
    
    # load csv file
    file = pd.read_csv(csv)
    columns = list(file.columns)
    columns = [re.sub(' ', '_', item) for item in columns]
    file.columns = columns
    # save to parquet
    file.to_parquet(parquet)
    
    return("Conversion Done! Ready to load '.parquet' files...")

In [5]:
from csv2parquet import Parquer

Parquer('kiva_loans.csv', 'kiva_loans.parquet')
Parquer('kiva_mpi_region_locations.csv', 'kiva_mpi_region_locations.parquet')
Parquer('loan_theme_ids.csv', 'loan_theme_ids.parquet')
Parquer('loan_themes_by_region.csv', 'loan_themes_by_region.parquet')

I made these conversions in Anaconda Spyder, and I did upload them in Databricks Data File Store!

In [7]:
# 1 / Kiva Loans

kiva_loans = spark.read.parquet('/FileStore/tables/kiva_loans.parquet')

In [8]:
kiva_loans.printSchema()

In [9]:
# display some rows

display(kiva_loans.take(5))

In [10]:
from pyspark.sql.functions import split

kiva_loans = kiva_loans.withColumn('post_date', split('posted_time', " ")[0])\
                       .withColumn('post_time', split('posted_time', " ")[1])\
                       .withColumn('disburse_date', split('disbursed_time', " ")[0])\
                       .withColumn('disburse_time', split('disbursed_time', " ")[1])\
                       .withColumn('fund_date', split('funded_time', " ")[0])\
                       .withColumn('fund_time', split('funded_time', " ")[1])

# date = post_date, we keep post_date

kiva_loans = kiva_loans.drop('posted_time', 'disbursed_time', 'funded_time','__index_level_0__', 'date')


In [11]:
kiva_loans.printSchema()

In [12]:
from pyspark.sql.functions import *

In [13]:
# totaly funded loans (fund = loan)

kiva_loans.select('funded_amount','loan_amount').filter("funded_amount = loan_amount").count()


In [14]:
# underfunded loans

print('Number of underfunded loans:',kiva_loans.select('funded_amount','loan_amount','use').filter("funded_amount < loan_amount").count(),'in total of:', kiva_loans.count())

# fund_date ... never totaly funded at all

kiva_loans.select('funded_amount','loan_amount','disburse_date','fund_date','use').filter("funded_amount < loan_amount").show(25,truncate = False)


In [15]:
# Overfunded loans

kiva_loans.select('funded_amount','loan_amount','use').filter("funded_amount > loan_amount").show(truncate = False)

In [16]:
# no funds right here

display(kiva_loans.select('country','region','sector','use','loan_amount','funded_amount').filter("funded_amount = 0")).show()

In [17]:
# most of unfunded loans are in USA

kiva_loans.select('country').filter("funded_amount = 0").groupBy('country').count().orderBy('count', ascending = False).show(10)

In [18]:
# most frequent countries and total funds per country

kiva_loans.select('country','funded_amount','lender_count').groupBy("country")\
          .agg(count('country').alias('count'),sum('funded_amount').alias('total_funds'),sum('lender_count').alias('total_lenders'))\
          .orderBy('count',ascending = False).show(truncate = False)

# most frequent regions and total funds per region

kiva_loans.select('region','country_code','funded_amount','lender_count').groupBy("region","country_code")\
          .agg(count('region').alias('count'),sum('funded_amount').alias('total_funds'),sum('lender_count').alias('total_lenders'))\
          .orderBy('count',ascending = False).show(truncate = False)

In [19]:
# number of borrowers

kiva_loans = kiva_loans.withColumn('num_of_borrowers', size(split('borrower_genders', ", ")))

kiva_loans.select('borrower_genders','num_of_borrowers').show(10)


In [20]:
# loan with the biggest lender count

kiva_loans.select(max('lender_count')).show()

kiva_loans.select('lender_count','partner_id','funded_amount','loan_amount','activity','country','region','currency','term_in_months',
                  'repayment_interval','disburse_date','fund_date','num_of_borrowers')\
          .filter("lender_count = 2986").show()
  

In [21]:
##### GENDERS #####


# gender of borrower(s)
kiva_loans.select('country', 'borrower_genders','use','id').show(100, truncate = False)


In [22]:
##### number of female/male borrowers per country


# only 1 female borrower

kiva_loans.filter("borrower_genders = 'female'").groupBy('country').count().orderBy('count', ascending = False).filter("count > 5000").show()

kiva_loans.select('borrower_genders').filter("borrower_genders = 'female'").show(50,truncate = False)
  

In [23]:
# at least 1 female borrower

kiva_loans.filter(kiva_loans.borrower_genders.like('%female%')).groupBy('country').count().orderBy('count', ascending = False)\
                                                               .filter("count > 5000").show()

kiva_loans.select('borrower_genders').filter(kiva_loans.borrower_genders.like('%female%')).show(90,truncate = False)


In [24]:
# only 1 male borrower

kiva_loans.filter("borrower_genders = 'male'").groupBy('country').count().orderBy('count', ascending = False).filter("count > 5000").show()

kiva_loans.select('borrower_genders').filter("borrower_genders = 'male'").show(50,truncate = False)


In [25]:
# at least 1 male borrower

kiva_loans.filter(kiva_loans.borrower_genders.like('male%')).groupBy('country').count().orderBy('count', ascending = False)\
                                                            .filter("count > 5000").show()

kiva_loans.select('borrower_genders').filter(kiva_loans.borrower_genders.like('male%')).show(50, truncate = False)


In [26]:
# number of borrowers

kiva_loans.select(avg('num_of_borrowers').alias('average')).show()

kiva_loans.select('num_of_borrowers').groupBy('num_of_borrowers').count().orderBy('count', ascending = False).show()


In [27]:
# -1 : Unknown gender

kiva_loans.select('funded_amount','use','country_code','borrower_genders','num_of_borrowers').filter("num_of_borrowers = -1").show()


In [28]:
# max and min term in months

kiva_loans.select(max('term_in_months'),min('term_in_months')).show()

# unique values of repayments intervals

kiva_loans.groupBy('repayment_interval').agg(countDistinct('repayment_interval').alias('count')).show()

# counts of repayment methods

kiva_loans.select('repayment_interval').groupBy('repayment_interval').count().orderBy('count', ascending = False).show()


In [29]:
# biggest pay off interval

display(kiva_loans.select('country','region','sector','activity','use','funded_amount','loan_amount','repayment_interval','num_of_borrowers')\
                  .filter("term_in_months = 158"))


In [30]:
# no funds right here

display(kiva_loans.select('country','region','sector','use','loan_amount','funded_amount').filter("funded_amount = 0")).show()

In [31]:
# pay off in 1 month

display(kiva_loans.select('country','region','sector','use','funded_amount','loan_amount','repayment_interval','num_of_borrowers')\
                  .filter("term_in_months = 1"))

In [32]:
# biggest fund amounts / region

kiva_loans.select('country','region','funded_amount').groupBy('country','region').agg(max('funded_amount').alias('max_amount'))\
                                                     .orderBy('max_amount', ascending = False).show(30)

  # smallest fund amounts / region
  
kiva_loans.select('country','region','funded_amount').groupBy('country','region').agg(min('funded_amount').alias('min_amount'))\
                                                     .orderBy('min_amount', ascending = True).show(30)


In [33]:
# loans & funds per sector

sectors = kiva_loans.select('sector','funded_amount','loan_amount').groupBy('sector')\
                    .agg(count('sector').alias('count'), sum('funded_amount').alias('total_funded'), sum('loan_amount').alias('total_loans'))\
                    .orderBy('count', ascending = False)
sectors.show()

In [34]:
display(sectors)
# Agriculture, Food, Retail, Services, Personal Use, Housing, Clothing, Education, Transportation, Arts, Health, Construction, Manufacturing
# Entertainment, Wholesale

In [35]:
# loans & funds per activity

act = kiva_loans.select('activity','funded_amount','loan_amount').groupBy('activity')\
                .agg(count('activity').alias('count'), sum('funded_amount').alias('total_funded'), sum('loan_amount').alias('total_loans'))\
                .orderBy('count', ascending = False)

act.show(truncate = False)


In [36]:
# dominant activities

display(act.filter("count > 10000"))

In [37]:
# biggest fund amounts from partners

top_lenders = kiva_loans.select('partner_id','funded_amount')\
                        .groupBy('partner_id').agg(count('partner_id').alias('num_of_funds'),max('funded_amount').alias('max_amount'),
                                     sum('funded_amount').alias('total_funds'))\
                        .orderBy('max_amount',ascending = False)
    
top_lenders.show(25)

In [38]:
# Need to load loan_themes_by_region2

themes_region = spark.read.parquet('/FileStore/tables/loan_themes_by_region2.parquet')
themes_region = themes_region.drop('__index_level_0__')

# lowercase column names

for col in themes_region.columns:
    themes_region = themes_region.withColumnRenamed(col, col.lower())
    

In [39]:
### JOIN kiva_loans & theme_regions / we need partner's name

top_lenders_name = top_lenders.join(themes_region.select('partner_id','field_partner_name','iso'), 'partner_id', how = 'left')
top_lenders_name = top_lenders_name.distinct()

In [40]:
top_lenders_name.show(truncate = False)

In [41]:
# max amounts worldwide (from table above)

display(top_lenders_name)

In [42]:
# biggest fund to Haiti

display(kiva_loans.select('funded_amount','currency','country','region','sector','use').filter("partner_id = 315.0")\
          .orderBy('funded_amount', ascending = False))


In [43]:
partnerIds = kiva_loans.select('lender_count','partner_id','funded_amount','loan_amount','activity','country','region','currency','term_in_months',
                              'repayment_interval','disburse_date','fund_date','num_of_borrowers')\
                       .join(themes_region.select('field_partner_name','partner_id','loan_theme_type'), 'partner_id', how='left').drop('partner_id4')


In [44]:
partnerIds = partnerIds.distinct()

In [45]:
# more information for top lender based on lender_count "Yunus Social Business"

display(partnerIds.where("lender_count = 2986"))

In [46]:
# top funders

top_funders = kiva_loans.select('partner_id','funded_amount')\
                        .groupBy('partner_id').agg(count('partner_id').alias('num_of_funds'),sum('funded_amount').alias('total_funds'))\
                        .orderBy('num_of_funds',ascending = False)

top_funders.show()

In [47]:
# JOIN with theme regions to use field partner name

top_funders_name = top_funders.join(themes_region.select('partner_id','field_partner_name','iso','country'), 'partner_id', how = 'left')
top_funders_name = top_funders_name.distinct()

In [48]:
top_funders_name.orderBy('num_of_funds', ascending = False).show(truncate = False)

In [49]:
# number of funds worlwide

display(top_funders_name)

In [50]:
# famous currencies in transactions

kiva_loans.select('currency').groupBy('currency').count().orderBy('count', ascending = False).show()

In [51]:
# transactions in US dollars out of United States

cur = kiva_loans.select('country','currency').filter(("currency = 'USD'")).groupBy('country','currency').count().orderBy('count', ascending = False)
cur.show(50, truncate = False)

In [52]:
# > 1000 funds in USD

display(cur.filter("count>1000"))

In [53]:
#### DATES

kiva_loans.select('post_date', 'disburse_date','fund_date').show(50)

In [54]:
# total loans and funds per year

years = kiva_loans.select(year('disburse_date').alias('year'),'funded_amount','loan_amount')\
                  .groupBy('year').agg(sum('funded_amount').alias('total_funds'),sum('loan_amount').alias('total_loans'))\
                  .orderBy('year')

years.show()


In [55]:
display(years)

In [56]:
kiva_loans.describe(['disburse_date']).show()

In [57]:
# disbursed date = null

display(kiva_loans.select('funded_amount','loan_amount','country','disburse_date','fund_date','use').where(col("disburse_date").isNull()))

In [58]:
# fully or partly funded but unknown if loan disbursed

kiva_loans.select('funded_amount','loan_amount','country','disburse_date',
                  'fund_date','post_date','use').where(col('disburse_date').isNull()).filter("funded_amount > 0").show(50)

In [59]:
# never funded

kiva_loans.select('funded_amount','loan_amount','country','disburse_date','fund_date','post_date','use')\
          .where(col('disburse_date').isNull()).filter("funded_amount = 0").show(50)

In [60]:
#################################################################################################################
# 2 / Kiva MPI Region locations
#################################################################################################################


In [61]:
# 2
kiva_regions = spark.read.parquet('/FileStore/tables/kiva_mpi_region_locations.parquet')
kiva_regions = kiva_regions.drop('__index_level_0__')

In [62]:
# count rows

kiva_regions.count()

In [63]:
display(kiva_regions.describe())

In [64]:
# get info from MPI and locations

kiva_regions.describe(['MPI']).show()
kiva_regions.select(countDistinct('LocationName').alias('Distinct Locations')).show()

In [65]:
# MPI values per country in alphabetical order

kiva_regions.select('MPI','country').groupBy('country').agg(format_number(avg('MPI'),3).alias('average_MPI'),
                                                            format_number(max('MPI'),3).alias('Max_MPI'),
                                                            format_number(min('MPI'),3).alias('Min_MPI'))\
                                                            .orderBy('country').show()

In [66]:
# max MPI per country

display(kiva_regions)

In [67]:
######################################################################################################################################
# 3
######################################################################################################################################

In [68]:
# 3
loan_themes = spark.read.parquet('/FileStore/tables/loan_theme_ids2.parquet')
loan_themes = loan_themes.drop('__index_level_0__')

In [69]:
print(loan_themes.columns)

# lowercase dataframe's column names

for col in loan_themes.columns:
    loan_themes = loan_themes.withColumnRenamed(col, col.lower())


In [70]:
loan_themes.show(10)

In [71]:
###################################################################################################################################
# 4 / Loan Thmes by Region
###################################################################################################################################

In [72]:
# 4
themes_region = spark.read.parquet('/FileStore/tables/loan_themes_by_region2.parquet')
themes_region = themes_region.drop('__index_level_0__')

In [73]:
print(themes_region.columns)

# lowercase dataframe's column names

for col in themes_region.columns:
    themes_region = themes_region.withColumnRenamed(col, col.lower())
    

In [74]:
display(themes_region.take(10))

In [75]:
rural_areas = themes_region.select('rural_pct','country','number','loan_theme_type','sector')\
                           .groupBy('country')\
                           .agg(format_number(avg('rural_pct'),2).alias('average_rural_pct'),
                                max('loan_theme_type').alias('top_loan_theme_type'),
                                max('sector').alias('sector'),
                                max('number'))\
                           .orderBy('average_rural_pct', ascending = False)

# max(number): Number of loans funded in this LocationName and this loan theme 

rural_areas.show(truncate = False)  

In [76]:
# shorten 'The Democratic Republic of the Congo'

rural_areas = rural_areas.withColumn('countries', regexp_replace('country', 'The Democratic Republic of the Congo', 'Congo'))

In [77]:
# countries with rural percentage over 90%

display(rural_areas.filter("average_rural_pct > 90"))

In [78]:
# Chart 1
# Average loan amounts per week of year

display(kiva_loans.select(weekofyear('disburse_date').alias('disburse_week'),'loan_amount').orderBy('disburse_date', ascending = True))

In [79]:
# Chart 2
# Average loan amounts per month 

display(kiva_loans.select(month('disburse_date').alias('disburse_month'),'loan_amount').orderBy('disburse_date', ascending = True))

In [80]:
# Chart 3
# Average loan amounts per day of year

display(kiva_loans.select(dayofyear('disburse_date').alias('disburse_day'),'loan_amount').orderBy('disburse_date', ascending = True))

In [81]:
# Chart 4
# Average fund amounts per day of year

display(kiva_loans.select(dayofyear('disburse_date').alias('disburse_day'),'funded_amount').orderBy('disburse_date', ascending = True))

In [82]:
# Many funds the last day of the year

display(kiva_loans.select(dayofyear('disburse_date').alias('disburse_date'),'loan_amount','funded_amount').orderBy('disburse_date', ascending = False))

Charts 1-3 : There are many null values so we got a peak at the begining of x-axis.

Charts 3 & 4 : A lot of donations at New Year's Eve!