Skip to content

robinwang9/PySpark_data_processing

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

10 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

a) Spark and b) Column-Oriented Storage

Submission *: 2023-03-24 23:59ET

Contents:


Part 1: a) Spark on Dataproc

Note: This part of the project goes with the lecture on Spark we just did on Monday 02/27. Do not start part b) yet. Part b) deals with Parquet and won't really make sense until the lecture on column-oriented storage on 03/06.

In this project, we will be using Spark to analyze data on the Dataproc Cluster. For this project, we will be filling out the lab_3_starter_code.py in the part_1_spark folder and Report.md file found in the root of this repository.

1.1 Getting started with pyspark

For this section, we will be loading data and implementing basic SQL queries with Spark-SQL, and Spark transformations. Before beginning this project, please make sure that the data in this repository (CSV and JSON files) are on HDFS. As an example and a reminder, to load data onto HDFS, use the following command:

hadoop fs -put boats.txt

Do the same for the reserves.json and sailors.json files, as well as artist_term.csv and tracks.csv which will be used in the next part.

To use spark from our python script, we must first construct a Spark session. The following code block creates a Spark session, which we can pass into functions with other arguments that use Spark.

if __name__ == '__main__':
    spark = SparkSession.builder.appName('Spark_Session_Name').getOrCreate()
    main(spark, other_arguments)

Before beginning the project, please run the lab_3_starter_code.py once to familiarize werself with the output of the starter code. To submit a pyspark job to the cluster, call spark-submit on the python file in the correct directory containing the job:

spark-submit --deploy-mode client lab_3_starter_code.py 

Once we submit the job to Dataproc, Spark will continuously output a log until completion. we will find that spark outputs quite a lot of information relating to the scheduling and execution of wer program, in addition to the outputs of wer program itself. This can become rather difficult to read through, but it's worth getting familiar with so that we can better diagnose errors that arise in wer programs.

Additionally, https://dataproc.hpc.nyu.edu/sparkhistory/ displays information about wer spark programs executing on Dataproc. we may want to use the search box to narrow this down to just wer own username. Clicking through to past job runs will provide detailed information about the execution of wer program, including visualizations of execution time and memory consumption that may be useful in part 2 of this project.

1.2 Loading data

The following code blocks are from the example query given in the starter code. To see the output of the example code, check the stdout from yarn logs as described above after submitting the job. We first load in the boats.txt data with the following line.

boats = spark.read.csv(f'hdfs:/user/{netID}_nyu_edu/boats.txt')

This first line loads the file boats.txt as a comma-separated values (CSV) file. To read json files, use the spark.read.json(file_path) function. An example is shown below:

sailors = spark.read.json(f'hdfs:/user/{netID}_nyu_edu/sailors.json')

Once a DataFrame is created, we can print its contents by calling the show() method. This is not done in the starter code to save output space.

boats.show()
sailors.show()

we can also print the dataframe's (inferred) schema by calling the printSchema() method:

boats.printSchema()
sailors.printSchema()

In the output from running the script once, we'll notice that the data loaded from JSON file (sailors) has type information and proper column headers, while the data loaded from CSV has string type on all columns and no column headers. Why might this be?

we can fix this by specifying the schema for boats on load:

boats = spark.read.csv('boats.txt', schema='bid INT, bname STRING, color STRING')
boats.printSchema()

After providing the schema, we should now see that the boats DataFrame has a correct type for each column, and proper column names.

1.3 Creating views

Once we have DataFrames loaded, we can register them as temporary views in wer spark session:

boats.createOrReplaceTempView('boats')
sailors.createOrReplaceTempView('sailors')

Remember that a view in RDBMS terms is a relation that gets constructed at run-time. Registering wer DataFrames as views will make it possible to execute SQL queries just as we would in a standard RDBMS:

results = spark.sql('SELECT count(*) FROM boats')

This creates a new DataFrame object results, but remember that all computation in Spark is lazy: no data will be processed until we ask for it! For example, we can print the results:

results.show()

or iterate over each row(not shown in the starter code):

for row in query.collect():
    print(row)

1.4 Results

For this project, we will complete the lab_3_starter_code.py, adding wer code for the following questions after the example query. we will also edit and submit the Report.md file, answering the questions listed out below, and copying and pasting results from wer stdout where needed. Don't forget to commit changes to Report.md! As a reminder, instructions on how to get stdout is given in section 1.1.

1.5 SQL queries and DataFrame methods

As we've seen above, Spark-SQL makes it possible to use SQL to process dataframes without having a proper RDBMS like SQLite or Postgres. we can also manipulate DataFrames directly from Python. For example, the following are equivalent:

res1 = spark.sql('SELECT * FROM sailors WHERE rating > 7')
res2 = sailors.filter(sailors.rating > 7)

While any query is possible using either interface, some things will be more naturally expressed in SQL, and some things will be easier in Python. Having some fluency with writing SQL will make it easier to know when to use each interface. Before starting this section, make sure we have loaded the reserves.json file.

  • Question 1: How would we express the following computation using SQL instead of the object interface? sailors.filter(sailors.age > 40).select(sailors.sid, sailors.sname, sailors.age)

  • Question 2: How would we express the following using the object interface instead of SQL? spark.sql('SELECT sid, COUNT(bid) from reserves WHERE bid != 101 GROUP BY sid')

  • Question 3: Using a single SQL query, how many distinct boats did each sailor reserve? The resulting DataFrame should include the sailor's id, name, and the count of distinct boats. (Hint: we may need to use first(...) aggregation function on some columns.) Provide both wer query and the resulting DataFrame in wer response to this question.

1.6 Bigger datasets

In this section, we will use Spark to analyze a slightly larger dataset. In the project repository, we will find CSV files artist_term.csv and tracks.csv.

As a first step, load these files as spark DataFrames with proper schema. Specifically, the artist_term file should have columns.

  • artistID
  • term

and the tracks file should have columns

  • trackID
  • title
  • release
  • year
  • duration
  • artistID

Note: Look at the first few lines of each file to determine the column types.

  • Question 4: Implement a query using Spark transformations which finds for each artist term, compute the median year of release, maximum track duration, and the total number of artists for that term (by ID). What are the results for the ten terms with the shortest average track durations? Include both wer query code and resulting DataFrame in wer response.

  • Question 5: Create a query using Spark transformations that finds the number of distinct tracks associated (through artistID) to each term. Modify this query to return only the top 10 most popular terms, and again for the bottom 10. Include each query and the tables for the top 10 most popular terms and the 10 least popular terms in wer response.

Part 2: b) Storage Optimization

In this part of the project, we will be comparing the speed of Spark queries against DataFrames backed by either CSV or Parquet file stores, and optimizing the storage to speed up queries. Note: This part of the project won't really make sense until the lecture on column-oriented storage on 03/06.

2.1 Getting started

In this scenario, we work as a data scientist for the "customer, consumer & credit" division of the Datamart Corporation, a flourishing e-commerce retailer in the thriving but fictional nation of Datania. Datania was founded on the very principles of data science, and as the data are getting ever bigger, wer services are needed. Fortunately, we are well trained.

To get we started, consider a dataset with information on Datamart customers that includes names, age in years, annual income in HadoopCoin (the only cryptocurrency backed by RDD computations, so sound money), 5-digit zip codes, how many orders they placed within the last year, whether they are part of the "Data Elite" loyalty program, and whether they signed up for a rewards credit card issued by Datamart. Three versions of the data are provided:

  • hdfs:/user/pw44_nyu_edu/peopleSmall.csv: 10,000 records (588 KB)
  • hdfs:/user/pw44_nyu_edu/peopleModerate.csv: 1,000,000 records (58.8 MB)
  • hdfs:/user/pw44_nyu_edu/peopleBig.csv: 100,000,000 records (5.88 GB)

The schema for these files is as follows:

  • schema='first_name STRING, last_name STRING, age INT, income FLOAT, zipcode INT, orders INT, loyalty BOOLEAN, rewards BOOLEAN'

In each of the sections below, we will run a set of DataFrame queries against each version of the data. This will allow we to measure how the different analyses and storage engines perform at different scales of data.

Tip: we recommend to work through each part completely using just the small and medium data before starting on the large data. This will make it easier for we to debug wer analysis and get familiar with the data.

2.2 Helper functions

In the part_2_storage directory, we will find a folder named queries and lab_3_storage_template_code.py. In the queries folder, we will find the following files:

The first seven files import from the bench.py file, which is used to conduct timing benchmarks on a spark DataFrame query. Rather than providing the DataFrame directly to the benchmark function, we will need to write a function that constructs a new DataFrame when called.

An example of this is given in basic_query.py. This function takes in the spark session object, as well as the path to a CSV file, and returns a DataFrame of the first five people sorted by last_name, first_name.

top5 = basic_query(spark, 'hdfs:/user/pw44_nyu_edu/peopleSmall.csv')

top5.show()

The output in stdout would then be the first five rows of the data.

Rather than benchmarking top5 directly, instead benchmark it as follows:

times = bench.benchmark(spark, 5, basic_query,'hdfs:/user/pw44_nyu_edu/peopleSmall.csv')
print(times)

which should produce something like the following as output:

[6.869371175765991, 0.21157383918762207, 0.2251441478729248, 0.1284043788909912, 0.12465882301330566]

The usage of the benchmark function above constructs the query 5 times using the peopleSmall.csv file, and returns a list of the time (in seconds) for each trial.

To simplify testing, we will place each query into its own script. The basic_query.py, csv_big_spender.py, csv_sum_orders.py, csv_brian.py, pq_big_spender.py, pq_sum_orders.py, and pq_brian.py take the file path for the dataset we want to perform the query on. To submit wer script along with bench.py to Spark, we must specify bench.py as an argument after --py-files, and then the query file we wish to run and the file path of the dataset we want to query off of.

spark-submit --deploy-mode client --py-files bench.py basic_query.py <wer_data_file_path>

The code contains example code for bench.benchmark(spark, 5, basic_query, file_path). Be sure to specify which file to run on or else it will not run! The CSV file paths are located above.

The lab_3_storage_template_code.py is skeleton code to connect to an active spark job. Feel free to use it as a template for saving the .csv files as .parquet files as well as optimizing the parquet files(2.4 and 2.5).

Remember, to get any outputs, we must print to stdout, which is obtained in the same steps as described in section 1.1

2.3 Benchmarking queries

Using the basic_query.py file as a template, we will be filling out the following scripts:

  • csv_sum_orders.py: Contains csv_sum_orders function that returns a DataFrame which computes the total number of orders for each zipcode.

  • csv_big_spender.py: Contains csv_big_spender function that returns a DataFrame which computes users having at least 100 orders, and are not currently signed up for the rewards credit card program.

  • csv_brian.py: Contains csv_brian function that returns a DataFrame which filters down to only include people with first_name of 'Brian' who are not in the loyalty program. These lucky constituents will be targeted by our next advertising campaign!

All three of these files should work similarly to the basic_query.py example: parameters are the spark session object and the path in HDFS to the CSV file.

After filling out each function, benchmark their performance on each of the three data sets in the main function of each file. Each benchmark should contain 25 trials. Record the minimum, median, and maximum time to complete each of the queries on each of the three data files.

2.4 CSV vs Parquet

For each of the three data files (small, medium, and large) convert the data to Parquet format and store it in wer own HDFS folder (e.g.,hdfs:/user/weR_USERID/people_small.parquet). The easiest way to do this is to load the CSV file into a DataFrame in wer python script, and then write it out using the DataFrame.write.parquet() method. Remember to use the lab_3_storage_template_code.py if we are having difficulty connecting to a Spark Session.

Having translated the three data files from CSV to Parquet, fill out the following files that operate on Parquet-backed files rather than CSV (Hint: spark.read.csv does not read parquet files!):

  • pq_sum_orders.py
  • pq_big_spender.py
  • pq_brian.py

All three queries are the same as the previous section. Once again, once we fill out the query function, repeat the benchmarking experiment from part 2.3 in the main function using these parquet-backed sources instead of the CSV sources. Again, report the min, median, and max time for each query on each of the three files. How do they compare to the results in part 1?

2.5 Optimizing Parquet

In the final part, wer job is to try to make things go faster! In this section, we are not allowed to change any of the query code that we wrote in the previous step, but we are allowed to change how the files are stored in HDFS utilizing Python scripts.

There are multiple ways of optimizing parquet structures. Some things we may want to try (but not limited to):

  • Sort the DataFrame according to particular columns before writing out to parquet.
  • Change the partition structure of the data. This can be done in two ways:
    • dataframe.repartition(...) (as described in lecture) and then writing the resulting dataframe back to a new file
    • Explicitly setting the partition columns when writing out to parquet. WARNING: this can be very slow!
  • Change the HDFS replication factor
  • (Non-trivial) Adjust configurations of parquet module in Spark.

Each of the three queries may benefit from different types of optimization, so we may want to make multiple copies of each file. Try at least three different ways mentioned above and search for the best configurations for each way.

Hint: we may want to look through the explain() output on each of wer queries when choosing optimizations.

Results

  • The code for all of wer queries(basic_query.py, csv_big_spender.py, csv_sum_orders.py, csv_brian.py, pq_big_spender.py, pq_sum_orders.py, and pq_brian.py files) in the queries folder.
  • In the same Report.md as Part 1, please write a brief report with the following information:
    • Tables of all numerical results (min, max, median) for each query/size/storage combination for parts 2.3, 2.4 and 2.5.
    • How do the results in parts 2.3, 2.4 and 2.5 compare?
    • What did we try in part 2.5 to improve performance for each query?
    • What worked, and what didn't work?

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 2

  •  
  •  

Languages