# Datalake for USA Wildfires
### Data Engineering Capstone Project

#### Project Summary
This project focuses on building a datalake on AWS S3 with USA wildfires and USA weather outliers data using star-schema data modelling technique for both ad-hoc analyses and batch processing.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
%AddJar https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.28.0/sqlite-jdbc-3.28.0.jar

Starting download from https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.28.0/sqlite-jdbc-3.28.0.jar
Finished download of sqlite-jdbc-3.28.0.jar


In [2]:
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import java.time.{ZonedDateTime, LocalDateTime, LocalDate}
import java.time.format.DateTimeFormatter

## Step 1: Scope the Project and Gather Data

#### Scope 
During this project we will read in an sqlite database with USA wildfires data together with a temperature outliers dataset using Apache Spark and store resulting data in a datalake on s3. The data model of the project is to create a star-schema pattern with resulting tables and store it on s3 in corresponding buckets with Amazon Athena on top for ad-hoc querying. Resulting data can be used to analyze geo-spatial information from both datasets to find correlation between unusual weather conditions and the probability of occuring a wildfire by superimposing coordinates data from both datasets or using K-means algorythm. Also it can be used for measuring the efficiency of containing wildfires by responsible agencies.

We will use Scala API of Apache Spark for this project with an sbt-assembly "fat" jar as a final deliverable. Choice of language is dictated by perfomance benefits of using Scala as a JVM language that is native to Spark.

#### Describe and Gather Data 
We will use:
* [1.88 Million US Wildfires dataset](https://www.kaggle.com/rtatman/188-million-us-wildfires) by Rachael Tatman

Dataset with a spatial database of wildfires that occurred in the United States from 1992 to 2015. The wildfire records were acquired from the reporting systems of federal, state, and local fire organizations.

* [U.S. Weather Outliers](https://data.world/carlvlewis/u-s-weather-outliers-1964) by Berkeley Earth

U.S. Weather Outliers data from 19964 to 2013

## Step 2: Explore and Assess the Data

### USA Wildfires Dataset

In [3]:
val df = spark.read.format("jdbc")
      .option("url", "jdbc:sqlite:/Users/yauhensobaleu/Downloads/FPA_FOD_20170508.sqlite")
      .option("dbtable",
        """
          |(select
          |   OBJECTID,
          |   FOD_ID,
          |   FPA_ID,
          |   SOURCE_SYSTEM_TYPE,
          |   SOURCE_SYSTEM,
          |   NWCG_REPORTING_AGENCY,
          |   NWCG_REPORTING_UNIT_ID,
          |   NWCG_REPORTING_UNIT_NAME,
          |   SOURCE_REPORTING_UNIT,
          |   SOURCE_REPORTING_UNIT_NAME,
          |   LOCAL_FIRE_REPORT_ID,
          |   LOCAL_INCIDENT_ID,
          |   FIRE_CODE,
          |   FIRE_NAME,
          |   ICS_209_INCIDENT_NUMBER,
          |   ICS_209_NAME,
          |   MTBS_ID,
          |   MTBS_FIRE_NAME,
          |   COMPLEX_NAME,
          |   FIRE_YEAR,
          |   DISCOVERY_DATE,
          |   DISCOVERY_DOY,
          |   DISCOVERY_TIME,
          |   STAT_CAUSE_CODE,
          |   STAT_CAUSE_DESCR,
          |   CONT_DATE,
          |   CONT_DOY,
          |   CONT_TIME,
          |   FIRE_SIZE,
          |   FIRE_SIZE_CLASS,
          |   LATITUDE,
          |   LONGITUDE,
          |   OWNER_CODE,
          |   OWNER_DESCR,
          |   STATE,
          |   COUNTY,
          |   FIPS_CODE,
          |   FIPS_NAME
          |FROM Fires)""".stripMargin)
      .option("driver", "org.sqlite.JDBC")
      .load()

df = [OBJECTID: int, FOD_ID: decimal(38,18) ... 36 more fields]


[OBJECTID: int, FOD_ID: decimal(38,18) ... 36 more fields]

In [5]:
df.count()

1880465

Let's print the schema of wildfires dataset:

In [5]:
df.printSchema()

root
 |-- OBJECTID: integer (nullable = true)
 |-- FOD_ID: decimal(38,18) (nullable = true)
 |-- FPA_ID: string (nullable = true)
 |-- SOURCE_SYSTEM_TYPE: string (nullable = true)
 |-- SOURCE_SYSTEM: string (nullable = true)
 |-- NWCG_REPORTING_AGENCY: string (nullable = true)
 |-- NWCG_REPORTING_UNIT_ID: string (nullable = true)
 |-- NWCG_REPORTING_UNIT_NAME: string (nullable = true)
 |-- SOURCE_REPORTING_UNIT: string (nullable = true)
 |-- SOURCE_REPORTING_UNIT_NAME: string (nullable = true)
 |-- LOCAL_FIRE_REPORT_ID: string (nullable = true)
 |-- LOCAL_INCIDENT_ID: string (nullable = true)
 |-- FIRE_CODE: string (nullable = true)
 |-- FIRE_NAME: string (nullable = true)
 |-- ICS_209_INCIDENT_NUMBER: string (nullable = true)
 |-- ICS_209_NAME: string (nullable = true)
 |-- MTBS_ID: string (nullable = true)
 |-- MTBS_FIRE_NAME: string (nullable = true)
 |-- COMPLEX_NAME: string (nullable = true)
 |-- FIRE_YEAR: decimal(38,18) (nullable = true)
 |-- DISCOVERY_DATE: decimal(38,18) (null

#### Issue 1. Decimal precision is too big

There are some columns with `Decimal(38,18)` data type which we can convert to more representable and less memory consuming format. Let's make a list of all Decimal columns

In [94]:
val decimalColumns = for (dtype <- df.dtypes if (dtype._2.startsWith("DecimalType")))
  yield dtype._1

decimalColumns = Array(FOD_ID, FIRE_YEAR, DISCOVERY_DATE, DISCOVERY_DOY, STAT_CAUSE_CODE, CONT_DATE, CONT_DOY, FIRE_SIZE, LATITUDE, LONGITUDE, OWNER_CODE)


[FOD_ID, FIRE_YEAR, DISCOVERY_DATE, DISCOVERY_DOY, STAT_CAUSE_CODE, CONT_DATE, CONT_DOY, FIRE_SIZE, LATITUDE, LONGITUDE, OWNER_CODE]

Let's take a look at these columns

In [17]:
%%dataframe --limit 2
df.select(decimalColumns.map(name => col(name)):_*)

FOD_ID,FIRE_YEAR,DISCOVERY_DATE,DISCOVERY_DOY,STAT_CAUSE_CODE,CONT_DATE,CONT_DOY,FIRE_SIZE,LATITUDE,LONGITUDE,OWNER_CODE
1.0,2005.0,2453403.5,33.0,9.0,2453403.5,33.0,0.1,40.03694444,-121.00583333,5.0
2.0,2004.0,2453137.5,133.0,1.0,2453137.5,133.0,0.25,38.93305556,-120.40444444,5.0


Let's convert integer-valued columns to `IntegerType`

In [6]:
val dfWithIntegers = df
    .withColumn("FOD_ID", $"FOD_ID".cast("int"))
    .withColumn("FIRE_YEAR", $"FIRE_YEAR".cast("int"))
    .withColumn("DISCOVERY_DOY", $"DISCOVERY_DOY".cast("int"))
    .withColumn("STAT_CAUSE_CODE", $"STAT_CAUSE_CODE".cast("int"))
    .withColumn("CONT_DOY", $"CONT_DOY".cast("int"))
    .withColumn("OWNER_CODE", $"OWNER_CODE".cast("int"))

dfWithIntegers = [OBJECTID: int, FOD_ID: int ... 36 more fields]


[OBJECTID: int, FOD_ID: int ... 36 more fields]

In [95]:
%%dataframe --limit 2
dfWithIntegers.select(decimalColumns.map(name => col(name)):_*)

FOD_ID,FIRE_YEAR,DISCOVERY_DATE,DISCOVERY_DOY,STAT_CAUSE_CODE,CONT_DATE,CONT_DOY,FIRE_SIZE,LATITUDE,LONGITUDE,OWNER_CODE
1,2005,2453403.5,33,9,2453403.5,33,0.1,40.03694444,-121.00583333,5
2,2004,2453137.5,133,1,2453137.5,133,0.25,38.93305556,-120.40444444,5


There are columns that do have floating point value. Let's find maximum values for these columns to understand to what extent can we reduce the size of fixed and precision values in decimals

In [20]:
dfWithIntegers.agg("DISCOVERY_DATE" -> "max", "CONT_DATE" -> "max", "FIRE_SIZE" -> "max").show(false)

+--------------------------+--------------------------+-------------------------+
|max(DISCOVERY_DATE)       |max(CONT_DATE)            |max(FIRE_SIZE)           |
+--------------------------+--------------------------+-------------------------+
|2457387.500000000000000000|2457391.500000000000000000|606945.000000000000000000|
+--------------------------+--------------------------+-------------------------+



It would be reasonable to decrease both the size of fixed and precision values in these columns to (10,2) except coordinates columns that have "-" sign

In [7]:
val properDecimalSize = "decimal(10,2)"
val coordinatesDecimalSize = "decimal(11,8)"

val dfWithIntegersProperDecimals = dfWithIntegers
    .withColumn("DISCOVERY_DATE", $"DISCOVERY_DATE".cast(properDecimalSize))
    .withColumn("CONT_DATE", $"CONT_DATE".cast(properDecimalSize))
    .withColumn("FIRE_SIZE", $"FIRE_SIZE".cast(properDecimalSize))
    .withColumn("LATITUDE", $"LATITUDE".cast(coordinatesDecimalSize))
    .withColumn("LONGITUDE", $"LONGITUDE".cast(coordinatesDecimalSize))

properDecimalSize = decimal(10,2)
coordinatesDecimalSize = decimal(11,8)
dfWithIntegersProperDecimals = [OBJECTID: int, FOD_ID: int ... 36 more fields]


[OBJECTID: int, FOD_ID: int ... 36 more fields]

Now it looks more friendly to an end user

In [22]:
%%dataframe --limit 2
dfWithIntegersProperDecimals

OBJECTID,FOD_ID,FPA_ID,SOURCE_SYSTEM_TYPE,SOURCE_SYSTEM,NWCG_REPORTING_AGENCY,NWCG_REPORTING_UNIT_ID,NWCG_REPORTING_UNIT_NAME,SOURCE_REPORTING_UNIT,SOURCE_REPORTING_UNIT_NAME,LOCAL_FIRE_REPORT_ID,LOCAL_INCIDENT_ID,FIRE_CODE,FIRE_NAME,ICS_209_INCIDENT_NUMBER,ICS_209_NAME,MTBS_ID,MTBS_FIRE_NAME,COMPLEX_NAME,FIRE_YEAR,DISCOVERY_DATE,DISCOVERY_DOY,DISCOVERY_TIME,STAT_CAUSE_CODE,STAT_CAUSE_DESCR,CONT_DATE,CONT_DOY,CONT_TIME,FIRE_SIZE,FIRE_SIZE_CLASS,LATITUDE,LONGITUDE,OWNER_CODE,OWNER_DESCR,STATE,COUNTY,FIPS_CODE,FIPS_NAME
1,1,FS-1418826,FED,FS-FIRESTAT,FS,USCAPNF,Plumas National Forest,511,Plumas National Forest,1,PNF-47,BJ8K,FOUNTAIN,,,,,,2005,2453403.5,33,1300,9,Miscellaneous,2453403.5,33,1730,0.1,A,40.03694444,-121.00583333,5,USFS,CA,63,63,Plumas
2,2,FS-1418827,FED,FS-FIRESTAT,FS,USCAENF,Eldorado National Forest,503,Eldorado National Forest,13,13,AAC0,PIGEON,,,,,,2004,2453137.5,133,845,1,Lightning,2453137.5,133,1530,0.25,A,38.93305556,-120.40444444,5,USFS,CA,61,61,Placer


#### Issue 2. Null values
Let's make a quick look at the number of NULL values in every column of the dataset

In [99]:
val nullCountsPerColumn = dfWithIntegersProperDecimals.columns.map(
    c => (c, dfWithIntegersProperDecimals.agg(
        sum(when(dfWithIntegersProperDecimals(c).isNull, 1).otherwise(0)).alias(c)
    ).first().getLong(0))
)

nullCountsPerColumn = Array((OBJECTID,0), (FOD_ID,0), (FPA_ID,0), (SOURCE_SYSTEM_TYPE,0), (SOURCE_SYSTEM,0), (NWCG_REPORTING_AGENCY,0), (NWCG_REPORTING_UNIT_ID,0), (NWCG_REPORTING_UNIT_NAME,0), (SOURCE_REPORTING_UNIT,0), (SOURCE_REPORTING_UNIT_NAME,0), (LOCAL_FIRE_REPORT_ID,1459286), (LOCAL_INCIDENT_ID,820821), (FIRE_CODE,1555636), (FIRE_NAME,957189), (ICS_209_INCIDENT_NUMBER,1854748), (ICS_209_NAME,1854748), (MTBS_ID,1869462), (MTBS_FIRE_NAME,1869462), (COMPLEX_NAME,1875282), (FIRE_YEAR,0), (DISCOVERY_DATE,0), (DISCOVERY_DOY,0), (DISCOVERY_TIME,882638), (STAT_CAUSE_CODE,0), (STAT_CAUSE_DESCR,0), (CONT_DATE,891531), (CONT_DOY,891531), (CONT_TIME,972173), (FIRE_SIZE,0), (FIRE_SIZE_CLASS,0), (LATITUDE,0), (LONGITUDE,0), (OWNER_CODE,0), (OWNER_DESCR,0), (STATE,0), (C...


[(OBJECTID,0), (FOD_ID,0), (FPA_ID,0), (SOURCE_SYSTEM_TYPE,0), (SOURCE_SYSTEM,0), (NWCG_REPORTING_AGENCY,0), (NWCG_REPORTING_UNIT_ID,0), (NWCG_REPORTING_UNIT_NAME,0), (SOURCE_REPORTING_UNIT,0), (SOURCE_REPORTING_UNIT_NAME,0), (LOCAL_FIRE_REPORT_ID,1459286), (LOCAL_INCIDENT_ID,820821), (FIRE_CODE,1555636), (FIRE_NAME,957189), (ICS_209_INCIDENT_NUMBER,1854748), (ICS_209_NAME,1854748), (MTBS_ID,1869462), (MTBS_FIRE_NAME,1869462), (COMPLEX_NAME,1875282), (FIRE_YEAR,0), (DISCOVERY_DATE,0), (DISCOVERY_DOY,0), (DISCOVERY_TIME,882638), (STAT_CAUSE_CODE,0), (STAT_CAUSE_DESCR,0), (CONT_DATE,891531), (CONT_DOY,891531), (CONT_TIME,972173), (FIRE_SIZE,0), (FIRE_SIZE_CLASS,0), (LATITUDE,0), (LONGITUDE,0), (OWNER_CODE,0), (OWNER_DESCR,0), (STATE,0), (COUNTY,678148), (FIPS_CODE,678148), (FIPS_NAME,678148)]

In [101]:
%%dataframe --limit 50
sc.parallelize(nullCountsPerColumn).toDF("column", "countna")

column,countna
OBJECTID,0
FOD_ID,0
FPA_ID,0
SOURCE_SYSTEM_TYPE,0
SOURCE_SYSTEM,0
NWCG_REPORTING_AGENCY,0
NWCG_REPORTING_UNIT_ID,0
NWCG_REPORTING_UNIT_NAME,0
SOURCE_REPORTING_UNIT,0
SOURCE_REPORTING_UNIT_NAME,0


Several columns have quite a lot of null values. The most imporant thing is that every wildfire record has its `DISCOVERY_DATE` specified. We can rely on 100% coverage of this column for any downstream purposes (like partitioning the resulting parquet file by discovery date and not by the date of containing a wildfire as it has a big chunk of nulls). Also it's worth mentioning that we can't specify the exact time of occuring a wildfire as `DISCOVERY_TIME` column has nulls as well.

#### Issue 3. Multiple date columns in Julian format
Columns `CONT_DATE` and `DISCOVERY_DATE` have Julian date format. Also the dataset has separate `*_DAY` and `*_TIME` columns. We can combine them into one timestamp column being mindful that `*_TIME` columns can have null values. 

Also let's discard `*_DOY` columns as this data will be present in date dimension dataframe

Firstly let's fill null values in `*_TIME` columns with `0000` to be able to concatenate it with date values

In [13]:
val dfFilled = dfWithIntegersProperDecimals.na.fill("0000", Seq("CONT_TIME", "DISCOVERY_TIME"))

dfFilled = [OBJECTID: int, FOD_ID: int ... 36 more fields]


<console>:6: error: Symbol 'type scala.AnyRef' is missing from the classpath.
This symbol is required by 'class org.apache.spark.sql.types.MetadataBuilder'.
Make sure that type AnyRef is in your classpath and check for conflicting dependencies with `-Ylog-classpath`.
A full rebuild may help if 'MetadataBuilder.class' was compiled against an incompatible version of scala.
  lazy val $print: String =  {
           ^


[OBJECTID: int, FOD_ID: int ... 36 more fields]

Define a udf function that converts Julian date

In [8]:
def convertJulianDate = udf((julianDate: Double) => {
  val julianDayNumber = math.floor(julianDate).toLong
  val julianDayFraction = julianDate - julianDayNumber
  val julianDayFractionToNanoSeconds = math.floor(julianDayFraction * 24 * 60 * 60 * math.pow(10, 9)).toLong

  val bcEraDateFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss:z:G")
  val julianDateStartDate = ZonedDateTime.parse("4714-11-24 12:00:00:GMT:BC", bcEraDateFormat)

  val zonedDateTime = julianDateStartDate.plusDays(julianDayNumber).plusNanos(julianDayFractionToNanoSeconds)

  zonedDateTime.toLocalDate.toString()
}
)

convertJulianDate: org.apache.spark.sql.expressions.UserDefinedFunction


Define custom function for concatenating date and time in wildfires dataset:

In [11]:
def concatDateAndTime(df:DataFrame, colName: String): DataFrame = {
    
    val dateColumn = colName + "_DATE"
    val timeColumn = colName + "_TIME"
    
    val resultDF = df.withColumn(colName + "_TIMESTAMP", 
        concat(
            convertJulianDate(col(dateColumn)), lit(" "), regexp_replace(col(timeColumn), "..(?!$)", "$0:"), lit(":00")
    ).cast("timestamp"))
    
    resultDF
}

concatDateAndTime: (df: org.apache.spark.sql.DataFrame, colName: String)org.apache.spark.sql.DataFrame


Create datetime columns and drop redundant `*_DOY`,  `*_TIME` and `FIRE_YEAR` columns

In [14]:
val dfWithTimestampsDiscovery = concatDateAndTime(dfFilled, "DISCOVERY")
val dfWithTimestampsCont = concatDateAndTime(dfWithTimestampsDiscovery, "CONT")

val stagedDF = dfWithTimestampsCont
    .toDF(dfWithTimestampsCont.columns.map(name => name.toLowerCase()):_*)
    .drop( 
    "discovery_doy", 
    "discovery_date",
    "discovery_time",
    "cont_doy",
    "cont_date",
    "cont_time"
)

dfWithTimestampsDiscovery = [OBJECTID: int, FOD_ID: int ... 37 more fields]
dfWithTimestampsCont = [OBJECTID: int, FOD_ID: int ... 38 more fields]
stagedDF = [objectid: int, fod_id: int ... 32 more fields]


[objectid: int, fod_id: int ... 32 more fields]

### Temperature dataset

In [1]:
val temperatures = spark.read.format("com.databricks.spark.csv")
    .option("header", "true")
    .load("/Users/yauhensobaleu/Downloads/weather-anomalies-1964-2013.csv")
    .drop("id", "station_name")

temperatures = [date_str: string, degrees_from_mean: string ... 6 more fields]


[date_str: string, degrees_from_mean: string ... 6 more fields]

We're interested in temperature data starting from 1992-01-01 only:

In [3]:
val stagedWeatherDF = temperatures
    .withColumn("date_str", $"date_str".cast("date"))
    .filter($"date_str" >= "1992-01-01")

stagedWeatherDF = [date_str: date, degrees_from_mean: string ... 6 more fields]


[date_str: date, degrees_from_mean: string ... 6 more fields]

In [4]:
stagedWeatherDF.count()

1352248

In [154]:
%%dataframe --limit 5
stagedWeatherDF

date_str,degrees_from_mean,longitude,latitude,max_temp,min_temp,type,serialid
2008-11-02,15.14,-99.0383,45.4478,20.0,0.0,Weak Hot,287175
2002-06-19,7.02,-106.4278,36.2403,35.6,13.3,Weak Hot,300563
2002-06-19,9.32,-84.9767,45.3725,31.1,20.6,Weak Hot,300564
2002-06-19,14.86,-117.9528,36.1389,40.6,21.7,Strong Hot,300565
2002-06-19,9.52,-110.395,40.1678,29.4,18.3,Weak Hot,300566


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

Project's data model consists of two fact tables `fires_fact` and `weather_fact` with appropriated SCD type 1 dimension tables respresenting data from USA wildfires and USA Weather Outliers datasets respectively. In terms of phisycal representation every table in our schema is a bucket with arbitrary number of parquet files on AWS S3.

Data model of the project:
![Data Model](https://i.imgur.com/oTnb9x1.jpg)

Dimensions of `fires_fact` table:
* `sources` - tracks down a record to any original source that usa_wildfires was made from
* `reporter` - lists agency names that reported a wild fire
* `fire_names` - names of wild fires (if exist)
* `fire_cause` - causes of fires and its codes
* `fire_classes` - codes for fire size based on the number of acres within the final fire perimeter expenditures
* `owners` - codes for primary owners or entities responsible for managing the land at the point of origin of the fire
* `locations` - locations of a wildfire

Dimensions of `weather_fact` table:
* `weather_types` - types of weather (e.g. Very Cold)

Common dimension:
* `dates` - dates dimension

#### 3.2 Mapping Out Data Pipelines
1. Create a spark job that will:
    1. Read in all datasets in spark dataframes
    2. Clean and conform data in these datasets
    2. Create dimension tables 
    3. Populate fact tables joining together foreign keys of all related dimension tables
    4. Store results on AWS S3
2. Package spark application in a "fat" jar with custom libraries
3. Launch spark-submit to execute spark application
4. Create Amazon Athena tables on top of AWS S3 buckets and perform data quality checks

## Step 4: Run Pipelines to Model the Data 
### 4.1 Create the data model

## Dimensions

Let's define a function that creates a dimension dataset with `SELECT DISTINCT` logic and serial column to uniquely identify each record.

NB: It's important to coalesce the results of a query into one partition to guarantee the consecutiveness of values generated by `monotonically_increasing_id()` function (see [docs](https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.functions.monotonically_increasing_id))

In [23]:
def createDimTable(df: DataFrame, id: String, cols: Seq[String]): DataFrame = {
    val raw_col_names = cols.map(name => name.toLowerCase())
    val dim_id = id.toLowerCase()
    val nullPlaceholder = "unknown" 
    
    val rawDimTable = df
        .select(raw_col_names.map(name => col(name)):_*)
//         .na.fill(nullPlaceholder) /* replace all nulls with a string placeholder for easier joins */
        .distinct 
        .coalesce(1) /* coalesce df in one partition to guarantee that monotonically_increasing_id() provide consecutive result */
        .orderBy(raw_col_names(0))
        .withColumn(dim_id, monotonically_increasing_id() + 1)
    
    val col_names = Seq(dim_id) ++ raw_col_names
    val dimTable = rawDimTable.select(col_names.map(name => col(name)):_*)
    
    dimTable
}

createDimTable: (df: org.apache.spark.sql.DataFrame, id: String, cols: Seq[String])org.apache.spark.sql.DataFrame


### Source dimension

In [24]:
val sourceCols = Seq(
    "source_system_type", 
    "source_system", 
    "source_reporting_unit", 
    "source_reporting_unit_name")

val sources = createDimTable(stagedDF, "source_id", sourceCols)

sourceCols = List(source_system_type, source_system, source_reporting_unit, source_reporting_unit_name)
sources = [source_id: bigint, source_system_type: string ... 3 more fields]


[source_id: bigint, source_system_type: string ... 3 more fields]

In [8]:
%%dataframe --limit 5
sources

source_id,source_system_type,source_system,source_reporting_unit,source_reporting_unit_name
1,FED,FS-FIRESTAT,0507,Los Padres National Forest
2,FED,FS-FIRESTAT,0412,Payette National Forest
3,FED,DOI-WFMI,MTRBA,Rocky Boy's Agency
4,FED,DOI-WFMI,IDBUD,Burley Field Office
5,FED,DOI-WFMI,ARHOP,Hot Springs National Park


### Reports dimension

In [25]:
val reportsCols = Seq(
    "nwcg_reporting_agency", 
    "nwcg_reporting_unit_id", 
    "nwcg_reporting_unit_name"
    )

val reports = createDimTable(stagedDF, "reporter_id", reportsCols)

reportsCols = List(nwcg_reporting_agency, nwcg_reporting_unit_id, nwcg_reporting_unit_name)
reports = [reporter_id: bigint, nwcg_reporting_agency: string ... 2 more fields]


[reporter_id: bigint, nwcg_reporting_agency: string ... 2 more fields]

In [10]:
%%dataframe --limit 5
reports

reporter_id,nwcg_reporting_agency,nwcg_reporting_unit_id,nwcg_reporting_unit_name
1,BIA,USNVWNA,Western Nevada Agency
2,BIA,USMTCRA,Crow Agency
3,BIA,USAZSCA,San Carlos Agency
4,BIA,USMNRLA,Red Lake Agency
5,BIA,USMTNCA,Northern Cheyenne Agency


### Fire_names dim

In [26]:
val fireNamesCols = Seq(
    "fire_name",
    "ics_209_name",
    "fire_code", 
    "mtbs_fire_name")

val fireNames = createDimTable(stagedDF, "fire_name_id", fireNamesCols)

fireNamesCols = List(fire_name, ics_209_name, fire_code, mtbs_fire_name)
fireNames = [fire_name_id: bigint, fire_name: string ... 3 more fields]


[fire_name_id: bigint, fire_name: string ... 3 more fields]

In [12]:
%%dataframe --limit 5
fireNames

fire_name_id,fire_name,ics_209_name,fire_code,mtbs_fire_name
1,0703,unknown,JAQ9,unknown
2,"1789 CO RD 21 N, AL",unknown,unknown,unknown
3,201 SELAWIK RIVER #4,unknown,GXH6,unknown
4,"2478 CENTER CHURCH RD, AL",unknown,unknown,unknown
5,"3000 HOMESTEAD, AL",unknown,unknown,unknown


### Fire cause dim

In [27]:
val fireCausesCols = Seq(
    "stat_cause_code",
    "stat_cause_descr")

val fireCauses = createDimTable(stagedDF, "fire_cause_id", fireCausesCols)

fireCausesCols = List(stat_cause_code, stat_cause_descr)
fireCauses = [fire_cause_id: bigint, stat_cause_code: int ... 1 more field]


[fire_cause_id: bigint, stat_cause_code: int ... 1 more field]

In [14]:
%%dataframe --limit 5
fireCauses

fire_cause_id,stat_cause_code,stat_cause_descr
1,1,Lightning
2,2,Equipment Use
3,3,Smoking
4,4,Campfire
5,5,Debris Burning


### Fire sizes dim

In [28]:
val fireSizesCols = Seq(
    "fire_size_class")

val fireSizes = createDimTable(stagedDF, "fire_size_id", fireSizesCols)
        .withColumn("lower_bound", 
                when($"fire_size_class" === "A", 0)
                when($"fire_size_class" === "B", 0.26)
                when($"fire_size_class" === "C", 10.0)
                when($"fire_size_class" === "D", 100)
                when($"fire_size_class" === "E", 300)
                when($"fire_size_class" === "F", 1000)
                when($"fire_size_class" === "G", 5000)
                   )
        .withColumn("upper_bound", 
                when($"fire_size_class" === "A", 0.25)
                when($"fire_size_class" === "B", 9.9)
                when($"fire_size_class" === "C", 99.9)
                when($"fire_size_class" === "D", 299)
                when($"fire_size_class" === "E", 999)
                when($"fire_size_class" === "F", 4999)
                when($"fire_size_class" === "G", null)
                   )

fireSizesCols = List(fire_size_class)
fireSizes = [fire_size_id: bigint, fire_size_class: string ... 2 more fields]


[fire_size_id: bigint, fire_size_class: string ... 2 more fields]

In [16]:
%%dataframe --limit 10
fireSizes

fire_size_id,fire_size_class,lower_bound,upper_bound
1,A,0.0,0.25
2,B,0.26,9.9
3,C,10.0,99.9
4,D,100.0,299.0
5,E,300.0,999.0
6,F,1000.0,4999.0
7,G,5000.0,


### Owners dim

In [29]:
val ownersCols = Seq(
    "owner_code",
    "owner_descr")

val owners = createDimTable(stagedDF, "owner_id", ownersCols)

ownersCols = List(owner_code, owner_descr)
owners = [owner_id: bigint, owner_code: int ... 1 more field]


[owner_id: bigint, owner_code: int ... 1 more field]

In [18]:
%%dataframe --limit 5
owners

owner_id,owner_code,owner_descr
1,0,FOREIGN
2,1,BLM
3,2,BIA
4,3,NPS
5,4,FWS


### Locations dim

In [30]:
val locationsCols = Seq(
    "state",
    "fips_name")

val locations = createDimTable(stagedDF, "location_id", locationsCols)

locationsCols = List(state, fips_name)
locations = [location_id: bigint, state: string ... 1 more field]


[location_id: bigint, state: string ... 1 more field]

In [20]:
%%dataframe --limit 5
locations

location_id,state,fips_name
1,AK,Prince of Wales-Outer Ketchikan
2,AK,Valdez-Cordova
3,AK,Wade Hampton
4,AK,Kenai Peninsula
5,AK,Ketchikan Gateway


### Date dimension

In [31]:
def createDatesDimDimension(date_start:String, date_end:String): DataFrame = {
    
    def dayIterator(start: LocalDate, end: LocalDate) = Iterator.iterate(start)(_ plusDays 1) takeWhile (_ isBefore end)

    val dates = dayIterator(LocalDate.parse(date_start), LocalDate.parse(date_end))
    
    val df_dates = spark.sparkContext
        .parallelize(dates.map(_.toString).toSeq)
        .toDF("date")
        .withColumn("date", col("date").cast("date"))
        .coalesce(1)
    
    val date_dim = df_dates
    .withColumn("date_dim_id", monotonically_increasing_id() + 1)
    .withColumn("date_type", lit("date"))
    .withColumn("date_actual", $"date")
    .withColumn("epoch", unix_timestamp($"date".cast("timestamp")))
    .withColumn("day_name", date_format($"date", "EEEE"))
    .withColumn("day_of_week", dayofweek($"date"))
    .withColumn("day_of_month", dayofmonth($"date"))
    .withColumn("day_of_year", dayofyear($"date"))
    .withColumn("week_of_month", date_format($"date", "W"))
    .withColumn("week_of_year", weekofyear($"date"))
    .withColumn("month_actual", month($"date"))
    .withColumn("month_name", date_format($"date", "MMMMM"))
    .withColumn("month_name_abbreviated", date_format($"date", "MMM"))
    .withColumn("quarter_actual", quarter($"date"))
    .withColumn("quarter_name", 
                when($"quarter_actual" === 1, "First")
                when($"quarter_actual" === 2, "Second")
                when($"quarter_actual" === 3, "Third")
                when($"quarter_actual" === 4, "Fourth"))
    .withColumn("year_actual", year($"date"))
    .drop("date")
    
    val max_id = date_dim.agg("date_dim_id" -> "max").first().getLong(0) + 1
    val unknownRecord = Seq((max_id, "unknown", null, null, null, null, null, null, null, null, null, null, null, null, null, null)).toDF()

    val finalDatesDF = date_dim.union(unknownRecord)
    finalDatesDF
}

createDatesDimDimension: (date_start: String, date_end: String)org.apache.spark.sql.DataFrame


In [32]:
%%dataframe --limit 5
val dates = createDatesDimDimension("1990-01-01", "2020-01-01")

dates = [date_dim_id: bigint, date_type: string ... 14 more fields]


date_dim_id,date_type,date_actual,epoch,day_name,day_of_week,day_of_month,day_of_year,week_of_month,week_of_year,month_actual,month_name,month_name_abbreviated,quarter_actual,quarter_name,year_actual
1,date,1990-01-01,631141200,Monday,2,1,1,1,1,1,January,Jan,1,First,1990
2,date,1990-01-02,631227600,Tuesday,3,2,2,1,1,1,January,Jan,1,First,1990
3,date,1990-01-03,631314000,Wednesday,4,3,3,1,1,1,January,Jan,1,First,1990
4,date,1990-01-04,631400400,Thursday,5,4,4,1,1,1,January,Jan,1,First,1990
5,date,1990-01-05,631486800,Friday,6,5,5,1,1,1,January,Jan,1,First,1990


### Weather types dim

In [160]:
val weatherTypesCols = Seq(
    "type")

val weatherTypes = createDimTable(stagedWeatherDF, "weather_type_id", weatherTypesCols)

weatherTypesCols = List(type)
weatherTypes = [weather_type_id: bigint, type: string]


[weather_type_id: bigint, type: string]

In [161]:
%%dataframe --limit 5
weatherTypes

weather_type_id,type
1,Strong Cold
2,Strong Hot
3,Weak Cold
4,Weak Hot


### Facts table

During fact table design a couple of question arised:
1. If we have date dimension, do we also need to have time dimension? If yes, it will be enourmous, if no, what to do with precise time querying?
2. What to do with identifiers that can't be used as a fact table surrogate primary key but nevertheless need to be present in the datalake (e.g. FPA_ID)? 

If we look into Ralph Kimball's `The Data Warehouse ETL Toolkit` we can see that these question have been already answered.
1. Creating a separate time dimension isn't recommended. We can use date dimension instead but if you want extra precision you can add full SQL datestamp directly to the fact table (`*_timestamp` columns in our case):

> We may also want to compute very precise time intervals by comparing the exact time of two fact table records. For these reasons, we recommend the design shown in Figure 5.5. The calendar day component of the precise time remains as a foreign key reference to our familiar calendar day dimension. But we also embed a full SQL date-time stamp directly in the fact table for all queries requiring the extra precision. Think of this as special kind of fact, not a dimension. In this interesting case, it is not useful to make a dimension with the minutes or seconds component of the precise time stamp, because the calculation of time intervals across fact table records becomes too messy when trying to deal with separate day and time-of-day dimensions.

2. This concept is known as a `degenerate dimension`. Some natural keys can be left as orphans during table design. It is recommended to leave in the fact table as pseudo-foreign key

> We insert the original order number directly into the fact table as if it were a dimension key. We could have made a separate dimension out of this order number, but it would have turned out to contain only the order number, nothing else. For this reason, we give this natural key of the parent a special status and call it a degenerate (or empty) dimension.

### USA wildfires fact table
Let's create a fact table using all prepared dimensions:

In [33]:
val disc_dates = dates.toDF(dates.columns.map(name => "disc_" + name):_*)
val cont_dates = dates.toDF(dates.columns.map(name => "cont_" + name):_*)

disc_dates = [disc_date_dim_id: bigint, disc_date_type: string ... 14 more fields]
cont_dates = [cont_date_dim_id: bigint, cont_date_type: string ... 14 more fields]


[cont_date_dim_id: bigint, cont_date_type: string ... 14 more fields]

In [34]:
val stagedDFwithDates = stagedDF
    .withColumn("disc_date_actual", $"discovery_timestamp".cast("date"))
    .withColumn("cont_date_actual", $"cont_timestamp".cast("date"))

stagedDFwithDates = [objectid: int, fod_id: int ... 34 more fields]


[objectid: int, fod_id: int ... 34 more fields]

Create a fact table using null-safe join

In [35]:
%%dataframe --limit 5
val firesFact = stagedDFwithDates
    .join(broadcast(disc_dates), stagedDFwithDates("disc_date_actual") <=> disc_dates("disc_date_actual"), "inner")
    .join(broadcast(cont_dates), stagedDFwithDates("cont_date_actual") <=> cont_dates("cont_date_actual"), "inner")
    .join(broadcast(sources), 
          stagedDFwithDates("source_system_type") <=> sources("source_system_type") &&
          stagedDFwithDates("source_system") <=> sources("source_system") &&
          stagedDFwithDates("source_reporting_unit") <=> sources("source_reporting_unit") &&
          stagedDFwithDates("source_reporting_unit_name") <=> sources("source_reporting_unit_name"),
         "inner")
    .join(broadcast(reports),    
          stagedDFwithDates("nwcg_reporting_agency") <=> reports("nwcg_reporting_agency") &&
          stagedDFwithDates("nwcg_reporting_unit_id") <=> reports("nwcg_reporting_unit_id") &&
          stagedDFwithDates("nwcg_reporting_unit_name") <=> reports("nwcg_reporting_unit_name"),
         "inner")
    .join(fireNames,
          stagedDFwithDates("fire_name") <=> fireNames("fire_name") &&
          stagedDFwithDates("ics_209_name") <=> fireNames("ics_209_name") &&
          stagedDFwithDates("fire_code") <=> fireNames("fire_code") &&
          stagedDFwithDates("mtbs_fire_name") <=> fireNames("mtbs_fire_name"),
         "inner")
    .join(broadcast(fireCauses),
          stagedDFwithDates("stat_cause_code") <=> fireCauses("stat_cause_code") &&
          stagedDFwithDates("stat_cause_descr") <=> fireCauses("stat_cause_descr"),
         "inner")
    .join(broadcast(fireSizes),
          stagedDFwithDates("fire_size_class") <=> fireSizes("fire_size_class"),
          "inner"
         )
    .join(broadcast(owners),
          stagedDFwithDates("owner_code") <=> owners("owner_code") &&
          stagedDFwithDates("owner_descr") <=> owners("owner_descr"),
          "inner"
         )
    .join(broadcast(locations),
          stagedDFwithDates("state") <=> locations("state") &&
          stagedDFwithDates("fips_name") <=> locations("fips_name"),  
          "inner"
         )
    .select(
        $"objectid" as "fire_id", 
        $"fpa_id",
        $"disc_date_dim_id" as "discovery_date_id",
        $"cont_date_dim_id" as "cont_date_id",
        $"source_id",
        $"reporter_id",
        $"fire_name_id",
        $"fire_cause_id",
        $"fire_size_id",  
        $"owner_id",
        $"location_id",
        $"fire_size",
        $"latitude",
        $"fire_year",
        $"longitude",
        $"discovery_timestamp".cast("string"),
        $"cont_timestamp".cast("string")
)

firesFact = [fire_id: int, fpa_id: string ... 15 more fields]


fire_id,fpa_id,discovery_date_id,cont_date_id,source_id,reporter_id,fire_name_id,fire_cause_id,fire_size_id,owner_id,location_id,fire_size,latitude,fire_year,longitude,discovery_timestamp,cont_timestamp
1869013,HIWMO-MA1857,6110,10958,1944,1502,76,13,6,15,543,2839.0,20.6178112,2006,-156.2554779,2006-09-23 00:00:00,
1839763,SFO-2015NENFS20430,9193,9193,2615,1615,168,9,5,15,1667,500.0,41.98677,2015,-99.869,2015-03-03 12:45:00,2015-03-03 20:00:00
1145328,AK-MSS-33452,7483,7483,1940,1123,502,4,1,8,16,0.1,61.6008339,2010,-149.2397156,2010-06-27 00:00:00,2010-06-27 04:57:00
1124091,CDF_2003_54_2220_007013,5052,10958,2095,1477,852,2,1,15,191,0.1,40.43888888,2003,-121.85194444,2003-10-31 00:00:00,
1125118,CDF_1995_54_2220_126,2054,10958,2095,1477,873,9,3,15,191,10.0,40.32388888,1995,-122.43388888,1995-08-16 00:00:00,


In [36]:
firesFact.printSchema()

root
 |-- fire_id: integer (nullable = true)
 |-- fpa_id: string (nullable = true)
 |-- discovery_date_id: long (nullable = false)
 |-- cont_date_id: long (nullable = false)
 |-- source_id: long (nullable = false)
 |-- reporter_id: long (nullable = false)
 |-- fire_name_id: long (nullable = false)
 |-- fire_cause_id: long (nullable = false)
 |-- fire_size_id: long (nullable = false)
 |-- owner_id: long (nullable = false)
 |-- location_id: long (nullable = false)
 |-- fire_size: decimal(10,2) (nullable = true)
 |-- latitude: decimal(11,8) (nullable = true)
 |-- fire_year: integer (nullable = true)
 |-- longitude: decimal(11,8) (nullable = true)
 |-- discovery_timestamp: string (nullable = true)
 |-- cont_timestamp: string (nullable = true)



### USA weather outliers fact table

In [155]:
val weather_dates = dates.toDF(dates.columns.map(name => "weather_" + name):_*)

weather_dates = [weather_date_dim_id: bigint, weather_date_type: string ... 14 more fields]


[weather_date_dim_id: bigint, weather_date_type: string ... 14 more fields]

In [156]:
val weatherOutliersFactwithDates = stagedWeatherDF
    .withColumn("weather_date_actual", $"date_str")

weatherOutliersFactwithDates = [date_str: date, degrees_from_mean: string ... 7 more fields]


[date_str: date, degrees_from_mean: string ... 7 more fields]

In [163]:
%%dataframe --limit 10
val weatherOutliersFact = weatherOutliersFactwithDates
    .join(broadcast(weather_dates), 
          weatherOutliersFactwithDates("weather_date_actual") <=> weather_dates("weather_date_actual"), "inner")
    .join(broadcast(weatherTypes), 
          weatherOutliersFactwithDates("type") <=> weatherTypes("type"), "inner")
    .select(
        $"serialid" as "weather_outlier_id",
        $"weather_date_dim_id",
        $"weather_type_id",
        $"longitude",
        $"latitude",
        $"max_temp",
        $"min_temp"
)

weatherOutliersFact = [weather_outlier_id: string, weather_date_dim_id: bigint ... 5 more fields]


weather_outlier_id,weather_date_dim_id,weather_type_id,longitude,latitude,max_temp,min_temp
287175,6881,4,-99.0383,45.4478,20.0,0.0
300563,4553,4,-106.4278,36.2403,35.6,13.3
300564,4553,4,-84.9767,45.3725,31.1,20.6
300565,4553,2,-117.9528,36.1389,40.6,21.7
300566,4553,4,-110.395,40.1678,29.4,18.3
300567,4553,4,-95.2481,48.2325,25.6,21.1
300568,4553,4,-111.0125,39.2078,31.1,15.6
300569,4553,4,-106.3169,39.2292,26.1,0.0
300570,4553,4,-108.7339,39.1014,36.7,20.0
300571,4553,4,-100.9447,39.1131,40.0,16.7


### 4.2 Data Quality Checks
Open `CapstoneDEND/athena/athena.ipynb` and run data quality checks using Amazon Athena

### 4.3 Data dictionary 
Dimensions:
* `sources` - tracks down a record to any original source that usa_wildfires was made from

    * `source_system_type` - type of source database or system that the record was drawn from (federal, nonfederal, or interagency)
    * `source_system` - name of or other identifier for source database or system that the record was drawn from
    * `source_reporting_unit` - code for the agency unit preparing the fire report, based on code/name in the source dataset
    * `ource_reporting_unit_name` - name of reporting agency unit preparing the fire report, based on code/name in the source dataset
    
    
* `reporters` - lists agency names that reported a wild fire
    * `nwcg_reporting_agency` - active National Wildlife Coordinating Group (NWCG) Unit Identifier for the agency preparing the fire report
    * `nwcg_reporting_unit_id` - active NWCG Unit Identifier for the unit preparing the fire report
    * `nwcg_reporting_unit_name` - active NWCG Unit Name for the unit preparing the fire report


* `fire_names` - names of wild fires (if exist)
    * `fire_name` - name of the incident, from the fire report (primary) or ICS-209 report (secondary)
    * `ics_209_name` - name of the incident, from the ICS-209 report
    * `fire_code` - code used within the interagency wildland fire community to track and compile cost information for emergency fire suppression
    * `mtbs_fire_name` - name of the incident, from the MTBS perimeter dataset
    

* `fire_causes` - causes of fires and its codes
    * `stat_cause_code` - code for the (statistical) cause of the fire
    * `stat_cause_descr` - description of the (statistical) cause of the fire
    

* `fire_sizes` - codes for fire size based on the number of acres within the final fire perimeter expenditures
    * `fire_size_class` - code for fire size based on the number of acres within the final fire perimeter expenditures 
    * `lower_bound` - lower bound of the amount of acres in a class
    * `upper_bound` - upper bound of the amount of acres in a class
    
    
* `locations` - locations of a wildfire
    * `state` - state where a wildfire occured
    * `fips_name` - county name
 
 
* `owners` - codes for primary owners or entities responsible for managing the land at the point of origin of the fire
    * `owner_code` - code for primary owner or entity responsible for managing the land at the point of origin of the fire at the time of the incident
    * `owner_descr` - name of primary owner or entity responsible for managing the land at the point of origin of the fire at the time of the incident
    

* `weather_types` - codes for weather types
    * `type` - type of variation as either 'weak cold', 'weak hot', 'strong hot', 'strong cold'
    
Fact tables:
* `fires_fact` - main table with wildfires measurements
    * `fire_id` - surrogate key of the table
    * `fpa_id` - unique identifier that contains information necessary to track back to the original record in the source dataset 
    * `discovery_date_id` - look up column to `dates` table for wildfires discovery dates
    * `cont_date_dim_id` - look up column to `dates` table for wildfires containment dates
    * `source_id` - look up column to `sources` table
    * `reporter_id` - look up column to `reportes` table
    * `fire_name_id` - look up column to `fire_names` table
    * `fire_size_id` - look up column to `fire_sizes` table
    * `fire_cause_id` - look up column to `fire_causes` table
    * `owner_id` - look up column to `owners` table
    * `location_id` - look up column to `location` table
    * `latitude` - latitude
    * `longitude` - longitude
    * `discovery_timestamp` - string with a date and time when a wildfires was discovered
    * `cont_timestamp` - string with a date and time when a wildfires was contained


* `weather_fact` - meain table with weather outliers measurements
    * `weather_outlier_id` - surrogate key of the table
    * `weather_date_dim_id` - look up column to `dates` table
    * `weather_type_id` - look up column to `weather_types` table
    * `longitude` - longitude
    * `longitude` - longitude
    * `max_temp` - daily high
    * `min_temp` - daily low

## Step 5: Complete Project Write Up
#### Clearly state the rationale for the choice of tools and technologies for the project.

To clearly state the rationale for the choice of tools let's recap what technologies have been used throughout the project:
* Apache Spark
* AWS S3
* AWS Athena


1. Distributed vs non-distributed

The rationale behind using distributed technologies (Apache Spark, AWS S3, AWS Athena) is dictated by the necessity to guarantee durability of data and long-term scaling potential. The volume of data used in this project doesn't exceed capacity of a single machine. It was possible to lay out star schema pattern in any local postgresql instance without the overhead of creating a scala spark job. But once established Big Data infrastructure allows to gracefully embrace increasing volumes of data without the need for remodelling data architecture and mitigating migration expenses. 

2. Amazon Athena vs Amazon Reshift as a querying arena

Though data warehouse such as Amazon Redshift provides faster querying capabilities it's up to an end-user to decide whether one need a full-fledged Amazon Redshift cluster that demands upscaling every time amount of your data exceeds limits or you can put up with less perfomant queries for the ability to pay as you go for 5$ per 1 TB of data scanned by Amazon Athena service. My conclusion is that sooner or later your data warehouse will have to store too much data to handle it gracefully and the need of SQL querying on data stored in S3 will arise: either in the form Amazon Redshift Spectrum or Amazon Athena. it is reasonable to start using Amazon Athena right from the start without overpaying for Redshift given the fact that Presto SQL engine supports pretty much every piece of SQL functionality that is needed for a data analyst to provide insights. A small bonus is that unlike Amazon Redshift SQL Presto SQL supports GROUPING SETS and CUBE functions which are really handy for making data marts via Amazon Athena views for Tableau reporting.

#### Propose how often the data should be updated and why

I would recommnd a source system to generate its output on demand. Data should be generated and processed when a wildfire occurs.

#### Write a description of how you would approach the problem differently under the following scenarios:
Questions:
1. The data was increased by 100x.
2. The data populates a dashboard that must be updated on a daily basis by 7am every day.
3. The database needed to be accessed by 100+ people.

Answers:
1. When choosing building a datalake on S3 with Apache Spark as a processing engine and Amazon Athena as a querying arena we had already laid a foundation for gracefully handling 100x increase in data. If reasonably partitioned and stored in columnar data storage format like parquet or ORC 100x increase will not be noticed.

2. For daily incremental uploads it would be reasonable to use any workflow management system like Apache Airflow with its EmrCreateJobFlowOperator for running spark jobs

3. Amazon Athena provides easy access to 100+ people unlike Amazon Redshift where you will need to finetune workload management query queues. If you still need a warehouse with the ability to organically accept hundreds of analysts Vertica might be something you can look at.