The project objective was to process a large data set on Apache Hadoop's distributed storage using the Apache Spark processing engine. For this purpose, a set of queries was implemented and run using both the Spark DataFrame and RDD APIs. The data sets used are a subset of the Yellow Taxi Trip Records, found here, involving the year 2022 and months January to June, along with the auxiliary Taxi Zone Lookup Table that contains the foreign key of location ids that map to specific boroughs and zones.
Query | Description |
---|---|
Q1 | Find the taxi trip with the maximum tip amount for the month of March where the drop-off zone is 'Battery Park'. |
Q2 | For each month, find the taxi trip with the maximum tolls amount. Ignore zero values. |
Q3 | For each consecutive 15-day time window, find the averages of total distance and amount, where the pick-up and drop-off zones are different. |
Q4 | For each day of the week, find the top 3 peak hours, meaning the 1-hour time slots of the day with the maximum number of passengers in a single taxi trip. This includes all given months, not a for-each. |
Q5 | For each month, find the top 5 days with the maximum average tip-to-fare amount ratio. |
- Apache Spark - v3.1.3
- Apache Hadoop - v2.7.7
- Python - v3.8.0
- OpenJDK - v1.8.0_292
A Hadoop & Spark cluster of 2 nodes was deployed using resources provided by GRNET's Okeanos-Knossos service. Specifically, 2 VMs were created to act in a master-slave architecture using a shared private network. Each VM was given 2 CPUs, 4GB RAM, 30GB HDD and hosted Ubuntu 16.04.7 LTS.
Node | IP | Namenode | Datanode | Master | Worker |
---|---|---|---|---|---|
Master | 192.168.0.1 | Yes | 1 | Yes | 0,1 |
Slave | 192.168.0.2 | No | 1 | No | 1 |
This section describes the steps to configure the cluster of the 2 VMs to run the code of this repository. This mainly includes system and config files edited and environment variables set. We assume that both systems have Python 3.8 and JDK 1.8 already installed.
-
Change hostnames to
master
andslave
on master and slave nodes respectively. So on master:$ sudo hostname master
and on slave:
$ sudo hostname slave
-
Edit
/etc/hosts
and map hostnames to IP address:192.168.0.1 master 192.168.0.2 slave
The master node, which will host the namenode and master servers, needs to be able to connect through ssh
to all other nodes in the cluster, in our case just the slave node. This process needs to be done without a password prompt.
-
On master, create ssh rsa public-private key pair:
$ ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
-
Copy the public key
id_rsa.pub
to~/.ssh/authorized_keys
file:$ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
-
Copy the
authorized_keys
to slave. This enables the master node to passwordless-lyssh
to the slave node.$ scp ~/.ssh/authorized_keys slave:~/.ssh/
This section describes how to download and setup Apache Hadoop to host 1 namenode and 2 datanodes.
-
Download Hadoop on master node:
$ wget https://archive.apache.org/dist/hadoop/common/hadoop-2.7.7/hadoop-2.7.7.tar.gz ~/
-
Unzip and rename Hadoop folder:
$ tar -xzf ~/hadoop-2.7.7.tar.gz $ rm ~/hadoop-2.7.7.tar.gz $ mv ~/hadoop-2.7.7 ~/hadoop
-
Export Hadoop environment variables and add binaries to
$PATH
:$ echo 'export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64' >> ~/.bashrc $ echo 'export HADOOP_HOME=$HOME/hadoop' >> ~/.bashrc $ echo 'export HADOOP_COMMON_HOME=$HADOOP_HOME' >> ~/.bashrc $ echo 'export HADOOP_HDFS_HOME=$HADOOP_HOME' >> ~/.bashrc $ echo 'export PATH=$PATH:$HADOOP_HOME/bin' >> ~/.bashrc $ echo 'export PATH=$PATH:$HADOOP_HOME/sbin' >> ~/.bashrc
-
Source
.bashrc
:$ source ~/.bashrc
Under $HADOOP_HOME/etc/hadoop
edit the below files:
-
hadoop-env.sh
:export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
-
core-site.xml
:<configuration> <property> <name>dfs.replication</name> <value>2</value> </property> <property> <name>dfs.namenode.name.dir</name> <value>file:///home/user/hdfs/name</value> </property> <property> <name>dfs.datanode.data.dir</name> <value>file:///home/user/hdfs/data</value> </property> <property> <name>dfs.blocksize</name> <value>64m</value> </property> <property> <name>dfs.support.append</name> <value>true</value> </property> <property> <name>dfs.webhdfs.enabled</name> <value>true</value> </property> </configuration>
-
hdfs-site.xml
:<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://master:9000</value> </property> </configuration>
-
slaves
:$ echo 'master' > $HADOOP_HOME/etc/hadoop/slaves $ echo 'slave' >> $HADOOP_HOME/etc/hadoop/slaves
-
Copy everything to slave node:
$ scp -r ~/hadoop slave:~/ $ scp -r ~/hadoop/etc/hadoop slave:~/hadoop/etc/ $ scp ~/.bashrc slave:~/ $ ssh slave source ~/.bashrc
-
Format the HDFS on master:
$ hdfs namenode -format
-
Start the HDFS cluster by running
start-dfs.sh
on master:$ start-dfs.sh
-
Finally, run the
jps
command and verify that every component is up and running:$ jps 9618 DataNode 9483 NameNode 10204 Jps 9853 SecondaryNameNode $ ssh slave jps 23831 Jps 23690 DataNode
This section describes how to download and setup Apache Spark on Hadoop cluster, on standalone mode, that will run applications on client
deployment mode.
-
Download Spark on master node:
$ wget https://downloads.apache.org/spark/spark-3.1.3/spark-3.1.3-bin-hadoop2.7.tgz ~/
-
Unzip and rename Spark folder:
$ tar -xzf ~/spark-3.1.3-bin-hadoop2.7.tar.gz $ rm ~/spark-3.1.3-bin-hadoop2.7.tar.gz $ mv ~/spark-3.1.3-bin-hadoop2.7 ~/spark
-
Export Spark environment variables, add binary to
$PATH
and set aliases:$ echo 'export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop' >> ~/.bashrc $ echo 'export SPARK_HOME=$HOME/spark' >> ~/.bashrc $ echo 'export PYSPARK_PYTHON=python3.8' >> ~/.bashrc $ echo 'export PYSPARK_DRIVER_PYTHON=python3.8' >> ~/.bashrc $ echo 'export LD_LIBRARY_PATH=$HADOOP_HOME/lib/native:$LD_LIBRARY_PATH' >> ~/.bashrc $ echo 'export PATH=$PATH:$SPARK_HOME/bin' >> ~/.bashrc $ echo 'alias start-all.sh="$SPARK_HOME/sbin/start-all.sh"' >> ~/.bashrc $ echo 'alias stop-all.sh="$SPARK_HOME/sbin/stop-all.sh"' >> ~/.bashrc $ echo 'alias start-worker.sh="$SPARK_HOME/sbin/start-worker.sh spark://master:7077"' >> ~/.bashrc $ echo 'alias stop-worker.sh="$SPARK_HOME/sbin/stop-worker.sh"' >> ~/.bashrc
-
Source
.bashrc
:$ source ~/.bashrc
Under $SPARK_HOME/conf
edit the below files:
-
spark-env.sh
:$ cp $SPARK_HOME/conf/spark-env.sh.template $SPARK_HOME/conf/spark-env.sh $ echo 'SPARK_WORKER_CORES=2' > $SPARK_HOME/conf/spark-env.sh $ echo 'SPARK_WORKER_MEMORY=3g' >> $SPARK_HOME/conf/spark-env.sh
-
spark-defaults.conf
:$ cp $SPARK_HOME/conf/spark-defaults.conf.template $SPARK_HOME/conf/spark-defaults.conf $ echo 'spark.master spark://master:7077' > $SPARK_HOME/conf/spark-defaults.conf $ echo 'spark.submit.deployMode client' >> $SPARK_HOME/conf/spark-defaults.conf $ echo 'spark.executor.instances 2' >> $SPARK_HOME/conf/spark-defaults.conf $ echo 'spark.executor.cores 2' >> $SPARK_HOME/conf/spark-defaults.conf $ echo 'spark.executor.memory 1536m' >> $SPARK_HOME/conf/spark-defaults.conf $ echo 'spark.driver.memory 512m' >> $SPARK_HOME/conf/spark-defaults.conf
-
workers
:$ cp $SPARK_HOME/conf/workers.template $SPARK_HOME/conf/workers $ echo 'master' > $SPARK_HOME/conf/workers $ echo 'slave' >> $SPARK_HOME/conf/workers
-
Copy everything to slave node:
$ scp -r ~/spark slave:~/ $ scp -r ~/spark/conf slave:~/spark/ $ scp ~/.bashrc slave:~/ $ ssh slave source ~/.bashrc
-
Start Spark by running
start-all.sh
on master:$ start-all.sh
-
Finally, run the
jps
command and verify that every component is up and running:$ jps 15442 Jps 9618 DataNode 15062 Worker 9483 NameNode 9853 SecondaryNameNode 14943 Master $ ssh slave jps 24233 Worker 23690 DataNode 24351 Jps
This section describes how to clone this repository on the master node and run the queries under src/main
.
-
Make directory to clone this repository, call it
atdb-project
:$ mkdir atdb-project $ cd atdb-project
-
Clone this repository:
$ git clone https://github.com/...
-
Export
$ATDB_PROJECT_HOME
environment variable pointing atatdb-project
directory:$ echo 'export ATDB_PROJECT_HOME=/path/to/atdb-project' >> ~/.bashrc
-
Source
.bashrc
:$ source ~/.bashrc
-
Install required python dependencies:
$ pip3.8 install -r requirements.txt
-
Make the directories for the Yellow Taxi Trip Records and the Taxi Zone Lookup Table files:
$ mkdir -p data/parquet data/csv
-
Download the required files:
$ for i in {1..6} > do > wget "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-0${i}.parquet" -P data/parquet/ > done $ wget https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv -P data/csv/
-
Upload the files to HDFS:
$ hdfs dfs -put data /
-
Test by running a query:
$ spark-submit src/main/q1_df.py
or run all using
submit-all.sh
and output logs and results underlogs
andout
directories:$ src/submit-all.sh Submitting file q1_df.py... Submitting file q2_df.py...
-
Finally, to scale down to 1 worker:
$ stop-worker.sh
and to scale back up:
$ start-worker.sh