## Berlin Airbnb Data
Investigating Airbnb activity in Berlin, Germany

In [10]:
object datasets {
    val reviews_summary = "reviews_summary.csv"
};

defined object datasets


In [2]:
// load dataset reviews_summary
val ReviewsSummaryDF = spark.read.option("inferSchema", true)
                                 .option("header", "true")
                                 .csv(datasets.reviews_summary)

ReviewsSummaryDF: org.apache.spark.sql.DataFrame = [listing_id: string, id: string ... 4 more fields]


In [3]:
// remove linhas vazias
val RemoverNullDF = ReviewsSummaryDF.na.drop()

RemoverNullDF: org.apache.spark.sql.DataFrame = [listing_id: string, id: string ... 4 more fields]


In [4]:
// exibe o squena do dataframe
RemoverNullDF.printSchema()

root
 |-- listing_id: string (nullable = true)
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- reviewer_id: string (nullable = true)
 |-- reviewer_name: string (nullable = true)
 |-- comments: string (nullable = true)



In [5]:
// exibe o numero de linhas do dataframe
println(RemoverNullDF.count()+" linhas")

405907 linhas


In [6]:
// cria novo dataframe renomeando colunas e definindo o tipo de dados do campo data
val NewDF = RemoverNullDF.select("listing_id", "date", "reviewer_name", "comments")
                                .withColumn("date", col("date")
                                .cast("date"))
                                .withColumnRenamed("date", "data")

NewDF: org.apache.spark.sql.DataFrame = [listing_id: string, data: date ... 2 more fields]


In [7]:
// exibe o squena do dataframe
NewDF.printSchema()

root
 |-- listing_id: string (nullable = true)
 |-- data: date (nullable = true)
 |-- reviewer_name: string (nullable = true)
 |-- comments: string (nullable = true)



In [8]:
NewDF.show(5)

+----------+----------+-------------+--------------------+
|listing_id|      data|reviewer_name|            comments|
+----------+----------+-------------+--------------------+
|      2015|2016-04-11|        Rahel|Mein Freund und i...|
|      2015|2016-04-15|       Hannah|Jan was very frie...|
|      2015|2016-04-26|       Victor|Un appartement tr...|
|      2015|2016-05-10|         Judy|It is really nice...|
|      2015|2016-05-14|       Romina|Buena ubicación, ...|
+----------+----------+-------------+--------------------+
only showing top 5 rows



In [11]:
// cria tabela temporária
NewDF.createOrReplaceTempView("table")

In [12]:
// cria novo dataframe a partir de consulta sql removendo campos vazios
val RemoveDuplicatesDF = spark.sql("""
SELECT
    year(data) AS year,
    month(data) AS month,
    day(data) AS day,
    reviewer_name AS name,
    comments
FROM table
ORDER BY year
""").na.drop()

RemoveDuplicatesDF: org.apache.spark.sql.DataFrame = [year: int, month: int ... 3 more fields]


In [13]:
RemoveDuplicatesDF.show(5)

+----+-----+---+--------+--------------------+
|year|month|day|    name|            comments|
+----+-----+---+--------+--------------------+
|2009|    6| 20|   Milan|excellent stay, i...|
|2009|    8| 18|     Ben|I could not have ...|
|2009|   11| 25|  Tarnia|This room is real...|
|2010|    8| 24|  Yuliya|Thank you very mu...|
|2010|   11| 24|Patricia|Fantastic, large ...|
+----+-----+---+--------+--------------------+
only showing top 5 rows



In [14]:
RemoveDuplicatesDF.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- comments: string (nullable = true)



In [16]:
// salva do dados no formato parquet particionando por year
RemoveDuplicatesDF.write.partitionBy("year")
                  .format("parquet")
                  .mode("overwrite")
                  .save("partition_year.parquet")

In [17]:
// carrega os dados de arquivos parquet
val partition_year = "/home/romerito/Dropbox/tecnology/bigdata_analytics/datasets/partition_year.parquet"
val Load_ParquetDF = spark.read.load(partition_year)

partition_year: String = /home/romerito/Dropbox/tecnology/bigdata_analytics/datasets/partition_year.parquet
Load_ParquetDF: org.apache.spark.sql.DataFrame = [month: int, day: int ... 3 more fields]


In [18]:
// cria tabela temporária
Load_ParquetDF.createOrReplaceTempView("parquet")

In [19]:
// cria novo dataframe com dados agrupados do numero de linhas por year
val CountLinesPartitionDF = spark.sql(
    """
    SELECT
    year,
    count(*) AS lines_partition
    FROM parquet
    GROUP BY year
    """
    ).na.drop()

CountLinesPartitionDF: org.apache.spark.sql.DataFrame = [year: int, lines_partition: bigint]


In [20]:
CountLinesPartitionDF.show()

+----+---------------+
|year|lines_partition|
+----+---------------+
|2018|         152182|
|2015|          39467|
|2013|           7422|
|2014|          17683|
|2012|           3092|
|2009|              3|
|2016|          69216|
|2010|            106|
|2011|            617|
|2017|         111692|
+----+---------------+



In [22]:
// salva dados no formato parquet
RemoveDuplicatesDF.write.partitionBy("year")
                  .format("parquet")
                  .mode("overwrite")
                  .save("CountLinesPartition.parquet")