In [1]:
catalog.list()

In [2]:
companies = catalog.load("companies")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/06 12:11:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
companies.show()

+-----+--------------+--------------------+-----------------+-------------+
|   id|company_rating|    company_location|total_fleet_count|iata_approved|
+-----+--------------+--------------------+-----------------+-------------+
|35029|          100%|                Niue|              4.0|            f|
|30292|           67%|            Anguilla|              6.0|            f|
|19032|           67%|  Russian Federation|              4.0|            f|
| 8238|           91%|            Barbados|             15.0|            t|
|30342|          null|Sao Tome and Prin...|              2.0|            t|
|32413|          100%|       Faroe Islands|              1.0|            f|
|35620|           90%|          Micronesia|              3.0|            f|
|23820|          null|              Rwanda|              1.0|            t|
|46528|          100%|          Uzbekistan|              3.0|            t|
|11875|          100%|          Micronesia|              2.0|            t|
|21120|     

In [5]:
pipelines

In [4]:
import pandas as pd
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import udf,col
from pyspark.sql.types import BooleanType, FloatType

@udf(returnType=BooleanType())
def _is_true(x: str) -> bool:
    return x == "t"

@udf(returnType=FloatType())
def _parse_percentage(x: str) -> float:
    if isinstance(x,str):
        x = x.replace("%", "")
        x = float(x) / 100
    return x

@udf(returnType=FloatType())
def _parse_money(x: str) -> float:
    if isinstance(x,str):
        x = x.replace("$", "").replace(",", "")
        x = float(x)
    return x


def preprocess_companies(companies: DataFrame) -> DataFrame:
    """Preprocesses the data for companies.

    Args:
        companies: Raw data.
    Returns:
        Preprocessed data, with `company_rating` converted to a float and
        `iata_approved` converted to boolean.
    """
    #companies["iata_approved"] = _is_true(companies["iata_approved"])
    #companies["company_rating"] = _parse_percentage(companies["company_rating"])
    companies = companies.withColumn("iata_approved", _is_true(companies.iata_approved))
    companies = companies.withColumn("company_rating", _parse_percentage(companies.company_rating))
    companies.show()
    return companies

In [5]:
cpx = preprocess_companies(companies)

[Stage 3:>                                                          (0 + 1) / 1]

+-----+--------------+--------------------+-----------------+-------------+
|   id|company_rating|    company_location|total_fleet_count|iata_approved|
+-----+--------------+--------------------+-----------------+-------------+
|35029|           1.0|                Niue|              4.0|        false|
|30292|          0.67|            Anguilla|              6.0|        false|
|19032|          0.67|  Russian Federation|              4.0|        false|
| 8238|          0.91|            Barbados|             15.0|         true|
|30342|          null|Sao Tome and Prin...|              2.0|         true|
|32413|           1.0|       Faroe Islands|              1.0|        false|
|35620|           0.9|          Micronesia|              3.0|        false|
|23820|          null|              Rwanda|              1.0|         true|
|46528|           1.0|          Uzbekistan|              3.0|         true|
|11875|           1.0|          Micronesia|              2.0|         true|
|21120|     

                                                                                

In [6]:
companies.show()

+-----+--------------+--------------------+-----------------+-------------+
|   id|company_rating|    company_location|total_fleet_count|iata_approved|
+-----+--------------+--------------------+-----------------+-------------+
|35029|          100%|                Niue|              4.0|            f|
|30292|           67%|            Anguilla|              6.0|            f|
|19032|           67%|  Russian Federation|              4.0|            f|
| 8238|           91%|            Barbados|             15.0|            t|
|30342|          null|Sao Tome and Prin...|              2.0|            t|
|32413|          100%|       Faroe Islands|              1.0|            f|
|35620|           90%|          Micronesia|              3.0|            f|
|23820|          null|              Rwanda|              1.0|            t|
|46528|          100%|          Uzbekistan|              3.0|            t|
|11875|          100%|          Micronesia|              2.0|            t|
|21120|     

In [7]:
def preprocess_shuttles(shuttles: pd.DataFrame) -> DataFrame:
    """Preprocesses the data for shuttles.

    Args:
        shuttles: Raw data.
    Returns:
        Preprocessed data, with `price` converted to a float and `d_check_complete`,
        `moon_clearance_complete` converted to boolean.
    """
    spark = SparkSession.builder.getOrCreate()
    sdf = spark.createDataFrame(shuttles)
    sdf = sdf.withColumn("d_check_complete", _is_true(sdf.d_check_complete))
    sdf = sdf.withColumn("moon_clearance_complete", _is_true(sdf.moon_clearance_complete))
    sdf = sdf.withColumn("price", _parse_money(sdf.price))
    #shuttles["d_check_complete"] = _is_true(shuttles["d_check_complete"])
    #shuttles["moon_clearance_complete"] = _is_true(shuttles["moon_clearance_complete"])
    #shuttles["price"] = _parse_money(shuttles["price"])
    return sdf

In [8]:
shuttles = catalog.load("shuttles")

In [9]:
frshut = preprocess_shuttles(shuttles)

In [10]:
frshut.show()

+-----+--------------------+------------+-----------+------------------+-------+------------------+-------------------+----+----------------+-----------------------+------+----------+
|   id|    shuttle_location|shuttle_type|engine_type|     engine_vendor|engines|passenger_capacity|cancellation_policy|crew|d_check_complete|moon_clearance_complete| price|company_id|
+-----+--------------------+------------+-----------+------------------+-------+------------------+-------------------+----+----------------+-----------------------+------+----------+
|63561|                Niue|     Type V5|    Quantum|ThetaBase Services|    1.0|                 2|             strict| 1.0|           false|                  false|1325.0|     35029|
|36260|            Anguilla|     Type V5|    Quantum|ThetaBase Services|    1.0|                 2|             strict| 1.0|            true|                  false|1780.0|     30292|
|57015|  Russian Federation|     Type V5|    Quantum|ThetaBase Services|    1.0|

In [11]:
reviews = catalog.load("reviews")

In [12]:
reviews.show()

+----------+--------------------+---------------------+-----------------------+------------------+------------------+----------------------+-------------------+-----------------+-----------------+
|shuttle_id|review_scores_rating|review_scores_comfort|review_scores_amenities|review_scores_trip|review_scores_crew|review_scores_location|review_scores_price|number_of_reviews|reviews_per_month|
+----------+--------------------+---------------------+-----------------------+------------------+------------------+----------------------+-------------------+-----------------+-----------------+
|     63561|                97.0|                 10.0|                    9.0|              10.0|              10.0|                   9.0|               10.0|              133|             1.65|
|     36260|                90.0|                  8.0|                    9.0|              10.0|               9.0|                   9.0|                9.0|                3|             0.09|
|     57015|   

In [13]:
rated_shuttles = frshut.join(reviews, frshut.id == reviews.shuttle_id, "right")

In [14]:
rated_shuttles.show()

[Stage 9:>                (0 + 10) / 10][Stage 10:>                 (0 + 0) / 1]

+-----+--------------------+------------+-----------+------------------+-------+------------------+-------------------+----+----------------+-----------------------+------+----------+----------+--------------------+---------------------+-----------------------+------------------+------------------+----------------------+-------------------+-----------------+-----------------+
|   id|    shuttle_location|shuttle_type|engine_type|     engine_vendor|engines|passenger_capacity|cancellation_policy|crew|d_check_complete|moon_clearance_complete| price|company_id|shuttle_id|review_scores_rating|review_scores_comfort|review_scores_amenities|review_scores_trip|review_scores_crew|review_scores_location|review_scores_price|number_of_reviews|reviews_per_month|
+-----+--------------------+------------+-----------+------------------+-------+------------------+-------------------+----+----------------+-----------------------+------+----------+----------+--------------------+---------------------+-----

                                                                                

In [27]:
    model_input_table = rated_shuttles.join(cpx,
        rated_shuttles.company_id == cpx.id, "inner"
    )

In [28]:
model_input_table.count()

In [35]:
model_input_table = model_input_table.drop("id")

In [36]:
model_input_table = model_input_table.na.drop()

In [37]:
model_input_table.count()

In [38]:
pipelines