# HW3 - Big Data Management

### By Natalia Beltrán & Mikel Gallo 

# Data Management Backbone 

In [338]:
## Imports 
import os 
import sys 
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import re
from pyspark.sql.functions import explode, col, count 


In [339]:
appName = "BigDataHW3"
master = "local[*]" # Spark will use all cores (*) available
if not 'spark' in globals(): # This 'trick' makes sure the SparkContext sc is initialized exactly once
    conf = SparkConf().setAppName(appName).setMaster(master)
    spark = SparkSession.builder \
        .config(conf=conf) \
        .getOrCreate()

## A Tasks for the Data Management Backbone 

### A.1 Explore the data and choose the KPIs

Creating the necessary local file system

If notebook is run, please remember to adapt all paths to own local paths

In [341]:
## Creating Landing zone file data exploration 
landing_zone_path = '/Users/nataliabeltran/Downloads/BSE_courses/Term_3_courses/23D020_Big_Data_Management_DS/big_data_proj3/LandingZone' ## landing zone
formatted_zone_path = '/Users/nataliabeltran/Downloads/BSE_courses/Term_3_courses/23D020_Big_Data_Management_DS/big_data_proj3/FormattedZone' ## formatted zone
exploitation_zone_path = '/Users/nataliabeltran/Downloads/BSE_courses/Term_3_courses/23D020_Big_Data_Management_DS/big_data_proj3/ExploitationZone' ## exploitation zone


## directories 
os.makedirs(landing_zone_path, exist_ok=True)
os.makedirs(formatted_zone_path, exist_ok=True)
os.makedirs(exploitation_zone_path, exist_ok=True)


## Files
'''The files chosen for the exploration is the data from the Cultural sites, Income and Price open data. These were manually moved to the LandingZone'''

'The files chosen for the exploration is the data from the Cultural sites, Income and Price open data. These were manually moved to the LandingZone'

### A.2 Data Formatting Process

In [342]:
## for the purpose of cleaning the text data 

def comprehensive_clean(s):
    if isinstance(s, str):
        s = s.strip().lower()  # Trim and convert to lowercase
        s = re.sub(r'[^a-zA-Z\s]', '', s)  # Remove non-alphanumeric characters
        s = re.sub(r'\s+', ' ', s)  # Replace multiple spaces with a single space
        return s
    return s

**Cultural Sites**

**Landing Zone** Culture data

In [343]:
## grabing from landing zone 
cultural_df = pd.read_csv(os.path.join(landing_zone_path, 'cultural-sites/2023-04-20.csv'))
cultural_df.head()

Unnamed: 0,addresses_roadtype_name,addresses_end_street_number,values_attribute_name,addresses_road_name,values_category,addresses_zip_code,values_value,addresses_town,geo_epgs_4326_y,geo_epgs_4326_x,...,addresses_road_id,created,geo_epgs_25831_y,institution_name,modified,values_description,values_id,addresses_neighborhood_name,values_outstanding,values_attribute_id
0,,,E-mail,C Esteve Terradas,Informació d'interès,8023,pvirgili@perevirgili.catsalut.net,BARCELONA,2.140266,41.414955,...,116901,1988-03-14T00:00:00,4585179.0,,2022-09-21T19:13:34.303475,,40212.0,Vallcarca i els Penitents,True,100002.0
1,,,Web,C Fulton,Informació d'interès,8032,http://lameva.barcelona.cat/horta-guinardo/ca/...,BARCELONA,2.161502,41.429994,...,137209,2015-05-26T12:24:46,4586832.0,,2021-09-09T02:14:37.921873,,95717.0,Horta,True,100003.0
2,,,,Pl Lesseps,,8023,,BARCELONA,2.151087,41.407712,...,123408,2015-05-26T13:26:20,4584366.0,,2021-09-09T02:14:44.349060,,,la Vila de Gràcia,,
3,,,E-mail,C Ramon Trias Fargas,Informació d'interès,8005,info@idec.upf.edu,BARCELONA,2.190872,41.390029,...,132908,2015-05-26T17:06:23,4582371.0,,2022-09-09T12:06:57.108133,,57030.0,la Vila Olímpica del Poblenou,True,100002.0
4,,,Web,Pg Vall d'Hebron,Informació d'interès,8035,http://www.ub.edu/campusmundet/,BARCELONA,2.147158,41.435755,...,352507,2015-05-26T17:35:12,4587483.0,,2022-09-09T12:07:54.518743,,95719.0,Montbau,True,100003.0


In [344]:
cultural_df.columns

Index(['addresses_roadtype_name', 'addresses_end_street_number',
       'values_attribute_name', 'addresses_road_name', 'values_category',
       'addresses_zip_code', 'values_value', 'addresses_town',
       'geo_epgs_4326_y', 'geo_epgs_4326_x', 'addresses_district_name',
       'geo_epgs_25831_x', 'addresses_start_street_number', 'register_id',
       'institution_id', 'addresses_main_address', 'addresses_district_id',
       'addresses_roadtype_id', 'addresses_type', 'addresses_neighborhood_id',
       '_id', 'name', 'addresses_road_id', 'created', 'geo_epgs_25831_y',
       'institution_name', 'modified', 'values_description', 'values_id',
       'addresses_neighborhood_name', 'values_outstanding',
       'values_attribute_id'],
      dtype='object')

In [345]:
cultural_dfinal = cultural_df[['addresses_road_name', 'values_category', 'values_attribute_name','addresses_type', 'addresses_zip_code', 'addresses_town', 'addresses_district_name', 'addresses_neighborhood_name', 'values_id']]
cultural_dfinal['addresses_neighborhood_name'] = cultural_dfinal['addresses_neighborhood_name'].apply(comprehensive_clean) 

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  cultural_dfinal['addresses_neighborhood_name'] = cultural_dfinal['addresses_neighborhood_name'].apply(comprehensive_clean)


**Formatted Zone** Culture data

In [346]:
## Convert to PySpark -> save to Formatted Zone 
df_culture = spark.createDataFrame(cultural_dfinal)
df_culture.write.partitionBy("addresses_district_name").parquet(os.path.join(formatted_zone_path, "CulturalSites/cultural_data.parquet"), mode='overwrite')
# df_culture.write.partitionBy("addresses_district_name").parquet(os.path.join(formatted_zone_path, "cultural_data.parquet"), mode='overwrite')

                                                                                

**A.4 Spark Query** Cultural

In [347]:
cultural_formatted_path = os.path.join(formatted_zone_path, "CulturalSites/cultural_data.parquet")
cultural_data = spark.read.parquet(cultural_formatted_path)

# Register DataFrame as a temporary view
cultural_data.createOrReplaceTempView("cultural_sites")

## aggregate by district number of sites
query_cultural = """
SELECT addresses_district_name AS District, COUNT(*) AS NumberOfSites
FROM cultural_sites
GROUP BY addresses_district_name
ORDER BY NumberOfSites DESC
"""

result_df_cultural = spark.sql(query_cultural)
result_df_cultural.show(10, truncate=False)

+-------------------+-------------+
|District           |NumberOfSites|
+-------------------+-------------+
|Gràcia             |135          |
|Ciutat Vella       |124          |
|Sarrià-Sant Gervasi|119          |
|Sant Andreu        |98           |
|Sants-Montjuïc     |91           |
|Eixample           |88           |
|Les Corts          |75           |
|Sant Martí         |60           |
|Horta-Guinardó     |52           |
|Nou Barris         |29           |
+-------------------+-------------+



**Price Open Data**

**Landing Zone** Price opendata data

In [348]:
price_file_path = os.path.join(landing_zone_path, 'price_opendata/price_opendata_neighborhood.json')
price_df = spark.read.json(price_file_path)
price_df.show(5, truncate=False)

+---+-----------+-------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------+
|_id|district_id|district_name|info                                                                                                                                                                                                                                                                     |neigh_name                           |
+---+-----------+-------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------

In [349]:
price_df.printSchema()

root
 |-- _id: long (nullable = true)
 |-- district_id: long (nullable = true)
 |-- district_name: string (nullable = true)
 |-- info: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Amount: double (nullable = true)
 |    |    |-- PerMeter: double (nullable = true)
 |    |    |-- diffAmount: double (nullable = true)
 |    |    |-- diffPerMeter: double (nullable = true)
 |    |    |-- usedAmount: double (nullable = true)
 |    |    |-- usedPerMeter: double (nullable = true)
 |    |    |-- year: long (nullable = true)
 |-- neigh_name : string (nullable = true)



In [350]:
## need to take out the information that is in the info column and create its own columns with it 

flattened_pice_df = price_df.select("_id", "district_id", "district_name", col("neigh_name ").alias("neigh_name"), explode("info").alias("info"))


price_df_expanded = flattened_pice_df.select(
    "_id", 
    "district_id", 
    "district_name", 
    "neigh_name",
    "info.year", 
    "info.Amount", 
    "info.PerMeter", 
    "info.diffAmount", 
    "info.diffPerMeter", 
    "info.usedAmount", 
    "info.usedPerMeter"
)

price_df_expanded.show(5, truncate=False)

+---+-----------+-------------+----------+----+------+--------+----------+------------+----------+------------+
|_id|district_id|district_name|neigh_name|year|Amount|PerMeter|diffAmount|diffPerMeter|usedAmount|usedPerMeter|
+---+-----------+-------------+----------+----+------+--------+----------+------------+----------+------------+
|1  |1          |Ciutat Vella |el Raval  |2013|97.0  |1726.5  |NULL      |NULL        |97.0      |1726.5      |
|1  |1          |Ciutat Vella |el Raval  |2014|141.7 |2087.6  |99.1      |2534.3      |143.1     |2073.5      |
|1  |1          |Ciutat Vella |el Raval  |2015|193.8 |2401.9  |NULL      |NULL        |193.8     |2401.9      |
|1  |1          |Ciutat Vella |el Raval  |2016|181.0 |2805.2  |NULL      |NULL        |180.7     |2798.6      |
|1  |1          |Ciutat Vella |el Raval  |2017|240.3 |3469.9  |292.5     |3633.1      |240.0     |3468.9      |
+---+-----------+-------------+----------+----+------+--------+----------+------------+----------+------

**Formatted Zone** Price opendata data

In [351]:
def row_to_dict(row):
    return row.asDict()

dict_pice_df = price_df_expanded.rdd.map(row_to_dict)

In [352]:
price_df_expanded = spark.createDataFrame(dict_pice_df)

output_parquet_path = os.path.join(formatted_zone_path, "PriceOpenData/pice_data_expanded.parquet")
# output_parquet_path = os.path.join(formatted_zone_path, "pice_data_expanded.parquet")
price_df_expanded.write.partitionBy("district_name").parquet(output_parquet_path, mode='overwrite')


**A.4 Spark Query** Price Open Data 

In [353]:
## register DataFrame as a temporary view
price_df_expanded.createOrReplaceTempView("price_data")

## aggregate Amount by district_name and year
query_price = """
SELECT district_name AS District, year AS Year, SUM(Amount) AS TotalAmount
FROM price_data
GROUP BY district_name, year
ORDER BY District, Year
"""

result_df_price = spark.sql(query_price)
result_df_price.show(10, truncate=False)



+------------+----+------------------+
|District    |Year|TotalAmount       |
+------------+----+------------------+
|Ciutat Vella|2013|675.3             |
|Ciutat Vella|2014|778.5999999999999 |
|Ciutat Vella|2015|979.1000000000001 |
|Ciutat Vella|2016|1109.1            |
|Ciutat Vella|2017|1225.0            |
|Eixample    |2013|1485.9            |
|Eixample    |2014|1605.1000000000001|
|Eixample    |2015|1785.9            |
|Eixample    |2016|1865.3            |
|Eixample    |2017|2185.7000000000003|
+------------+----+------------------+
only showing top 10 rows



**Income**

**Landing Zone** Income data

In [354]:
income_directory =  os.path.join(landing_zone_path, 'income/')
income_files = [file for file in os.listdir(income_directory) if file.endswith('.csv')]

## looked at data to see how to move to formatted section 
dfs = [pd.read_csv(os.path.join(income_directory, file)) for file in income_files]
income_df = pd.concat(dfs, ignore_index=True)
income_df.head(5)

Unnamed: 0,Any,Codi_Districte,Nom_Districte,Codi_Barri,Nom_Barri,Població,Índex RFD Barcelona = 100
0,2011,1,Ciutat Vella,1,el Raval,48485,65.0
1,2011,1,Ciutat Vella,2,el Barri Gòtic,17257,98.8
2,2011,1,Ciutat Vella,3,la Barceloneta,15674,73.1
3,2011,1,Ciutat Vella,4,"Sant Pere, Santa Caterina i la Ribera",22632,86.1
4,2011,2,Eixample,5,el Fort Pienc,32348,97.9


In [355]:
income_df['Any'].unique() ## checked that all the csv from all separate years got combined 

array([2011, 2013, 2010, 2007, 2012, 2014, 2016, 2009, 2015, 2017, 2008])

In [356]:
## cleaning text just in case

# Rename columns
income_df = income_df.rename(columns={"Població": "Population", "Índex RFD Barcelona = 100": "Index"})


income_df['Nom_Barri'] = income_df['Nom_Barri'].apply(comprehensive_clean) 

**Formatted Zone** Income data

In [357]:
df_income = spark.createDataFrame(income_df)
df_income.write.partitionBy("Nom_Districte").parquet(os.path.join(formatted_zone_path, "Income/income_data.parquet"), mode='overwrite')

                                                                                

**A.4 Spark Query** Income data

In [358]:
## register DataFrame as a temporary view
df_income.createOrReplaceTempView("income_data")

## aggregate total population by district and year 
query_income = """
SELECT Nom_Districte AS District, Any AS Year, SUM(Population) AS TotalPopulation
FROM income_data
GROUP BY District, Year
ORDER BY District, Year
"""

result_df_income = spark.sql(query_income)
result_df_income.show(10, truncate=False)


+------------+----+---------------+
|District    |Year|TotalPopulation|
+------------+----+---------------+
|Ciutat Vella|2007|113034         |
|Ciutat Vella|2008|111636         |
|Ciutat Vella|2009|109847         |
|Ciutat Vella|2010|106849         |
|Ciutat Vella|2011|104048         |
|Ciutat Vella|2012|105216         |
|Ciutat Vella|2013|103944         |
|Ciutat Vella|2014|102237         |
|Ciutat Vella|2015|100227         |
|Ciutat Vella|2016|100451         |
+------------+----+---------------+
only showing top 10 rows



### A.3 Move Data to the Exploitation Zone 

<h3>Cultural Data</h3>

In [359]:
## inspecting data from formatted zone 
Cultural = spark.read.parquet('FormattedZone/CulturalSites/')
print("Partitions: ",Cultural.rdd.glom().collect())
Cultural.show(5)

Partitions:  [[Row(addresses_road_name='Pg Sant Gervasi', values_category='NaN', values_attribute_name='NaN', addresses_type=nan, addresses_zip_code=8022, addresses_town='BARCELONA', addresses_neighborhood_name='sant gervasi la bonanova', values_id=nan, addresses_district_name='Sarrià-Sant Gervasi'), Row(addresses_road_name='C Sant Gervasi de Cassoles', values_category='NaN', values_attribute_name='NaN', addresses_type=nan, addresses_zip_code=8022, addresses_town='BARCELONA', addresses_neighborhood_name='sant gervasi la bonanova', values_id=nan, addresses_district_name='Sarrià-Sant Gervasi'), Row(addresses_road_name='C Canet', values_category='NaN', values_attribute_name='NaN', addresses_type=nan, addresses_zip_code=8017, addresses_town='BARCELONA', addresses_neighborhood_name='sarri', values_id=nan, addresses_district_name='Sarrià-Sant Gervasi'), Row(addresses_road_name='C Bellesguard', values_category='NaN', values_attribute_name='NaN', addresses_type=nan, addresses_zip_code=8022, ad

In [360]:
## changing column name to match those of other datasets
Cultural_final = Cultural.withColumnRenamed("addresses_district_name", "District") \
    .withColumnRenamed("addresses_neighborhood_name", "Neighborhood_Name") \
        .withColumnRenamed("addresses_road_name", "Addresses_Road_Name") \
            .withColumnRenamed("values_category", "Values_Category") \
                .withColumnRenamed("values_attribute_name", "Values_Attribute_Name") \
                    .withColumnRenamed("addresses_type", "Addresses_Type") \
                        .withColumnRenamed("addresses_zip_code", "Addresses_Zip_Code") \
                            .withColumnRenamed("addresses_town", "Addresses_Town") \
                                .withColumnRenamed("values_id", "Values_Id") 

    
Cultural_final.show(5)

+--------------------+---------------+---------------------+--------------+------------------+--------------+--------------------+---------+-------------------+
| Addresses_Road_Name|Values_Category|Values_Attribute_Name|Addresses_Type|Addresses_Zip_Code|Addresses_Town|   Neighborhood_Name|Values_Id|           District|
+--------------------+---------------+---------------------+--------------+------------------+--------------+--------------------+---------+-------------------+
|     Pg Sant Gervasi|            NaN|                  NaN|           NaN|              8022|     BARCELONA|sant gervasi la b...|      NaN|Sarrià-Sant Gervasi|
|C Sant Gervasi de...|            NaN|                  NaN|           NaN|              8022|     BARCELONA|sant gervasi la b...|      NaN|Sarrià-Sant Gervasi|
|             C Canet|            NaN|                  NaN|           NaN|              8017|     BARCELONA|               sarri|      NaN|Sarrià-Sant Gervasi|
|       C Bellesguard|            

**Exploitation zone** Cultural data

In [361]:
## saving data to exploitation zone (both as parquet and csv to help with section B)
parquet_output_path_culture = os.path.join(exploitation_zone_path, "CulturalSites/cultural_data_final.parquet")
csv_output_path_culture = os.path.join(exploitation_zone_path, "CulturalSites/cultural_data_final.csv")

## saving parquet 
Cultural_final.write.partitionBy("District").parquet(parquet_output_path_culture, mode='overwrite')

## saving last to csv 
Cultural_final.coalesce(1).write.option("header", "true").mode('overwrite').csv(csv_output_path_culture)
print(f"Data saved to {parquet_output_path_culture} and {csv_output_path_culture}")


Data saved to /Users/nataliabeltran/Downloads/BSE_courses/Term_3_courses/23D020_Big_Data_Management_DS/big_data_proj3/ExploitationZone/CulturalSites/cultural_data_final.parquet and /Users/nataliabeltran/Downloads/BSE_courses/Term_3_courses/23D020_Big_Data_Management_DS/big_data_proj3/ExploitationZone/CulturalSites/cultural_data_final.csv


<h3>Price opendata data</h3>

In [362]:
## inspecting data from formatted zone 
Price = spark.read.parquet('FormattedZone/PriceOpenData/')
print("Partitions: ",Price.rdd.glom().collect())
Price.show(5)

Partitions:  [[Row(Amount=275.5, PerMeter=2800.0, _id=63, diffAmount=348.0, diffPerMeter=3331.4, district_id=10, neigh_name="el Camp de l'Arpa del Clot", usedAmount=274.5, usedPerMeter=2793.3, year=2013, district_name='Sant Martí'), Row(Amount=252.3, PerMeter=2618.5, _id=63, diffAmount=250.2, diffPerMeter=2601.3, district_id=10, neigh_name="el Camp de l'Arpa del Clot", usedAmount=252.3, usedPerMeter=2618.7, year=2014, district_name='Sant Martí'), Row(Amount=234.5, PerMeter=2818.1, _id=63, diffAmount=None, diffPerMeter=None, district_id=10, neigh_name="el Camp de l'Arpa del Clot", usedAmount=234.5, usedPerMeter=2818.1, year=2015, district_name='Sant Martí'), Row(Amount=269.4, PerMeter=3331.6, _id=63, diffAmount=278.7, diffPerMeter=4332.4, district_id=10, neigh_name="el Camp de l'Arpa del Clot", usedAmount=269.2, usedPerMeter=3306.1, year=2016, district_name='Sant Martí'), Row(Amount=341.8, PerMeter=3988.8, _id=63, diffAmount=420.4, diffPerMeter=4871.0, district_id=10, neigh_name="el Cam

In [363]:
## cleaning dataframe

Price_final = Price.withColumnRenamed("district_name", "District") \
    .withColumnRenamed("neigh_name", "Neighborhood_Name") \
        .withColumnRenamed("usedAmount", "Used_Amount") \
            .withColumnRenamed("usedPerMeter", 'Used_Per_Meter') \
                .withColumnRenamed("year", 'Year') \
                    .withColumnRenamed("diffPerMeter", 'diff_Per_Meter') \
                        .withColumnRenamed("diffAmount", 'diff_Amount') \
                            .withColumnRenamed("PerMeter", 'Per_Meter') \
                                .withColumnRenamed("district_id", 'District_Id') 

Price_final.show(5)

+------+---------+---+-----------+--------------+-----------+--------------------+-----------+--------------+----+----------+
|Amount|Per_Meter|_id|diff_Amount|diff_Per_Meter|District_Id|   Neighborhood_Name|Used_Amount|Used_Per_Meter|Year|  District|
+------+---------+---+-----------+--------------+-----------+--------------------+-----------+--------------+----+----------+
| 275.5|   2800.0| 63|      348.0|        3331.4|         10|el Camp de l'Arpa...|      274.5|        2793.3|2013|Sant Martí|
| 252.3|   2618.5| 63|      250.2|        2601.3|         10|el Camp de l'Arpa...|      252.3|        2618.7|2014|Sant Martí|
| 234.5|   2818.1| 63|       NULL|          NULL|         10|el Camp de l'Arpa...|      234.5|        2818.1|2015|Sant Martí|
| 269.4|   3331.6| 63|      278.7|        4332.4|         10|el Camp de l'Arpa...|      269.2|        3306.1|2016|Sant Martí|
| 341.8|   3988.8| 63|      420.4|        4871.0|         10|el Camp de l'Arpa...|      336.3|        3927.3|2017|Sant

**Exploitation zone** Price Open Data data

In [364]:
## saving data to exploitation zone (both as parquet and csv to help with section B)
parquet_output_path_price = os.path.join(exploitation_zone_path, "PriceOpenData/price_data_final.parquet")
csv_output_path_price = os.path.join(exploitation_zone_path, "PriceOpenData/price_data_final.csv")

## saving parquet 
Price_final.write.partitionBy("District").parquet(parquet_output_path_price, mode='overwrite')

## saving last to csv 
Price_final.coalesce(1).write.option("header", "true").mode('overwrite').csv(csv_output_path_price)
print(f"Data saved to {parquet_output_path_price} and {csv_output_path_price}")

Data saved to /Users/nataliabeltran/Downloads/BSE_courses/Term_3_courses/23D020_Big_Data_Management_DS/big_data_proj3/ExploitationZone/PriceOpenData/price_data_final.parquet and /Users/nataliabeltran/Downloads/BSE_courses/Term_3_courses/23D020_Big_Data_Management_DS/big_data_proj3/ExploitationZone/PriceOpenData/price_data_final.csv


<h3>Income data</h3>

In [365]:
## inspecting data from formatted zone 
Income = spark.read.parquet('FormattedZone/Income/')
print("Partitions: ",Income.rdd.glom().collect())
Income.show(5)

Partitions:  [[Row(Any=2017, Codi_Districte=8, Codi_Barri=44, Nom_Barri='vilapicina i la torre llobeta', Population=25618, Index='63.8', Nom_Districte='Nou Barris'), Row(Any=2017, Codi_Districte=8, Codi_Barri=45, Nom_Barri='porta', Population=25046, Index='64.4', Nom_Districte='Nou Barris'), Row(Any=2017, Codi_Districte=8, Codi_Barri=46, Nom_Barri='el tur de la peira', Population=15506, Index='51.9', Nom_Districte='Nou Barris'), Row(Any=2017, Codi_Districte=8, Codi_Barri=47, Nom_Barri='can peguera', Population=2233, Index='51.5', Nom_Districte='Nou Barris'), Row(Any=2017, Codi_Districte=8, Codi_Barri=48, Nom_Barri='la guineueta', Population=15247, Index='53.8', Nom_Districte='Nou Barris'), Row(Any=2017, Codi_Districte=8, Codi_Barri=49, Nom_Barri='canyelles', Population=6863, Index='52.2', Nom_Districte='Nou Barris'), Row(Any=2017, Codi_Districte=8, Codi_Barri=50, Nom_Barri='les roquetes', Population=15648, Index='49.7', Nom_Districte='Nou Barris'), Row(Any=2017, Codi_Districte=8, Codi_

In [366]:
## cleaning dataframe
Income_final = Income.drop("Nom_Barri")

Income_final = Income_final.withColumnRenamed("Any", "Year") \
    .withColumnRenamed("Nom_Districte", "District") \
        .withColumnRenamed("Codi_Barri", "Neighborhood Code") \
            .withColumnRenamed("Codi_Districte", "District Code")

Income_final.show(5)

+----+-------------+-----------------+----------+-----+----------+
|Year|District Code|Neighborhood Code|Population|Index|  District|
+----+-------------+-----------------+----------+-----+----------+
|2017|            8|               44|     25618| 63.8|Nou Barris|
|2017|            8|               45|     25046| 64.4|Nou Barris|
|2017|            8|               46|     15506| 51.9|Nou Barris|
|2017|            8|               47|      2233| 51.5|Nou Barris|
|2017|            8|               48|     15247| 53.8|Nou Barris|
+----+-------------+-----------------+----------+-----+----------+
only showing top 5 rows



**Exploitation zone** Income data

In [367]:
## saving data to exploitation zone 
parquet_output_path_income = os.path.join(exploitation_zone_path, "Income/income_data_final.parquet")
csv_output_path_income = os.path.join(exploitation_zone_path, "Income/income_data_final.csv")


In [368]:
## saving parquet 
Income_final.write.partitionBy("District").parquet(parquet_output_path_income, mode='overwrite')

## saving last to csv 
Income_final.coalesce(1).write.option("header", "true").mode('overwrite').csv(csv_output_path_income)
print(f"Data saved to {parquet_output_path_income} and {csv_output_path_income}")

Data saved to /Users/nataliabeltran/Downloads/BSE_courses/Term_3_courses/23D020_Big_Data_Management_DS/big_data_proj3/ExploitationZone/Income/income_data_final.parquet and /Users/nataliabeltran/Downloads/BSE_courses/Term_3_courses/23D020_Big_Data_Management_DS/big_data_proj3/ExploitationZone/Income/income_data_final.csv


**Combination of data** Income and Price 

In [369]:
# Join dfIdealista_transformed and dfIncome_transformed by year and district using inner join
dfJoined_priceIncome = Price_final.join(Income_final, ["Year", "District"], "inner")
dfJoined_priceIncome.show(5)

+----+----------+------+---------+---+-----------+--------------+-----------+-----------------+-----------+--------------+-------------+-----------------+----------+-----+
|Year|  District|Amount|Per_Meter|_id|diff_Amount|diff_Per_Meter|District_Id|Neighborhood_Name|Used_Amount|Used_Per_Meter|District Code|Neighborhood Code|Population|Index|
+----+----------+------+---------+---+-----------+--------------+-----------+-----------------+-----------+--------------+-------------+-----------------+----------+-----+
|2017|Nou Barris| 176.5|   2243.7| 55|       NULL|          NULL|          8|         Vallbona|      176.5|        2243.7|            8|               44|     25618| 63.8|
|2017|Nou Barris|  77.1|   1216.1| 54|       NULL|          NULL|          8| Ciutat Meridiana|       77.1|        1216.1|            8|               44|     25618| 63.8|
|2017|Nou Barris|  97.0|   1380.5| 53|       NULL|          NULL|          8|       Torre Baró|       97.0|        1380.5|            8|    

In [370]:
## saving data to exploitation zone 
parquet_output_path_priceIncome = os.path.join(exploitation_zone_path, "Price_Income/PriceIncome_data_final.parquet")
csv_output_path_priceIncome = os.path.join(exploitation_zone_path, "Price_Income/PriceIncome_data_final.csv")

## saving parquet 
dfJoined_priceIncome.write.partitionBy("District").parquet(parquet_output_path_priceIncome, mode='overwrite')

## saving last to csv 
dfJoined_priceIncome.coalesce(1).write.option("header", "true").mode('overwrite').csv(csv_output_path_priceIncome)
print(f"Data saved to {parquet_output_path_priceIncome} and {csv_output_path_priceIncome}")

Data saved to /Users/nataliabeltran/Downloads/BSE_courses/Term_3_courses/23D020_Big_Data_Management_DS/big_data_proj3/ExploitationZone/Price_Income/PriceIncome_data_final.parquet and /Users/nataliabeltran/Downloads/BSE_courses/Term_3_courses/23D020_Big_Data_Management_DS/big_data_proj3/ExploitationZone/Price_Income/PriceIncome_data_final.csv


# A.4 Validate the Data (queries)

The Spark queries for the exploitation zone for each of the data sets can be found here. The queries for the formatted zone are in the A.2 Formatted section under their respective subsections. 

<h3>Validation queries Cultural data</h3>



In [371]:
## loading data from exploitation culture data
cultural_exploitation = spark.read.parquet(parquet_output_path_culture)
cultural_exploitation.createOrReplaceTempView("cultural_data_exploitation")

query_cultural_analytics = """
SELECT District, COUNT(*) AS TotalEntries
FROM cultural_data_exploitation
GROUP BY District
ORDER BY District
"""

result_cultural_analytics = spark.sql(query_cultural_analytics)
result_cultural_analytics.show(10, truncate=False)




+-------------------+------------+
|District           |TotalEntries|
+-------------------+------------+
|Ciutat Vella       |124         |
|Eixample           |88          |
|Gràcia             |135         |
|Horta-Guinardó     |52          |
|Les Corts          |75          |
|Nou Barris         |29          |
|Sant Andreu        |98          |
|Sant Martí         |60          |
|Sants-Montjuïc     |91          |
|Sarrià-Sant Gervasi|119         |
+-------------------+------------+



<h3>Validation queries Price Open Data data </h3>

In [372]:
## loading from exploitation price data
price_exploitation = spark.read.parquet(parquet_output_path_price)
price_exploitation.createOrReplaceTempView("cultural_data_exploitation")

price_exploitation.createOrReplaceTempView("price_data")

query_price_analytics = """
SELECT District, Year, AVG(Used_Amount) AS AvgUsedAmount
FROM price_data
GROUP BY District, Year
ORDER BY District, Year
"""

result_price_analytics = spark.sql(query_price_analytics)
result_price_analytics.show(10, truncate=False)


+------------+----+------------------+
|District    |Year|AvgUsedAmount     |
+------------+----+------------------+
|Ciutat Vella|2013|147.75            |
|Ciutat Vella|2014|190.8             |
|Ciutat Vella|2015|244.50000000000003|
|Ciutat Vella|2016|270.275           |
|Ciutat Vella|2017|293.15            |
|Eixample    |2013|232.33333333333334|
|Eixample    |2014|266.5             |
|Eixample    |2015|292.31666666666666|
|Eixample    |2016|308.3666666666667 |
|Eixample    |2017|357.63333333333327|
+------------+----+------------------+
only showing top 10 rows



<h3>Validation queries: Income data</h3>

In [373]:
## loading from exploitation income data 
income_exploitation = spark.read.parquet(parquet_output_path_income)
income_exploitation.createOrReplaceTempView("cultural_data_exploitation")

income_exploitation.createOrReplaceTempView("income_data")


query_income_analytics = """
SELECT District, Year, AVG(Index) AS AvgIndex
FROM income_data
GROUP BY District, Year
ORDER BY District, Year
"""

result_income_analytics = spark.sql(query_income_analytics)
result_income_analytics.show(10, truncate=False)

+------------+----+-----------------+
|District    |Year|AvgIndex         |
+------------+----+-----------------+
|Ciutat Vella|2007|74.52499999999999|
|Ciutat Vella|2008|72.775           |
|Ciutat Vella|2009|77.375           |
|Ciutat Vella|2010|79.525           |
|Ciutat Vella|2011|80.75            |
|Ciutat Vella|2012|80.85000000000001|
|Ciutat Vella|2013|84.3             |
|Ciutat Vella|2014|85.35            |
|Ciutat Vella|2015|89.32499999999999|
|Ciutat Vella|2016|91.925           |
+------------+----+-----------------+
only showing top 10 rows



## B Tasks for the Data Analysis Backbone
### B.1 Descriptive Analysis and Dashboarding 