# Installation

Check what will be installed:

In [None]:
!cd .. && git log -n 3

In [None]:
!pip uninstall -y wmfdata

In [None]:
!pip install -e ..

In [None]:
!pip install pyarrow

# Setup

In [None]:
import os

import pandas as pd
import wmfdata as wmf

import findspark
findspark.init("/usr/lib/spark2")
import pyspark

def assert_dataframes_match(df1, df2):
  assert df1.equals(df2)
  assert df1.index.equals(df2.index)
  assert df1.columns.equals(df2.columns)

In [None]:
COMMANDS_WITH_EMPTY_OUTPUT = [
  "DROP TABLE IF EXISTS neilpquinn.wmfdata_test_3",
  # Schema matches test_data_1
  """
  CREATE TABLE `neilpquinn.wmfdata_test_3`(
    `month` TIMESTAMP, 
    `wiki` STRING, 
    `user_id` BIGINT,
    `user_name` STRING, 
    `edits` BIGINT,
    `content_edits` BIGINT,
    `user_registration` TIMESTAMP
  )
  """,
  """
  SELECT *
  FROM neilpquinn.wmfdata_test_3
  """
]

# Loading data

In [None]:
TEST_DATA_1 = pd.read_parquet("test_data_1.parquet", engine="pyarrow")

spark = wmf.spark.get_session(type="local", app_name="wmfdata-test")
current_directory = os.getcwd()
spark_df = spark.read.load(f"file://{current_directory}/test_data_1.parquet")
spark_df.write.mode("overwrite").saveAsTable("neilpquinn.wmfdata_test_1")

TEST_DATA_1_QUERY = """
SELECT *
FROM neilpquinn.wmfdata_test_1
"""

In [None]:
TEST_DATA_2 = pd.read_csv("test_data_2.csv")

wmf.hive.load_csv(
  "test_data_2.csv",
  "name string, iso_code string, economic_region string, maxmind_continent string",
  db_name="neilpquinn",
  table_name="wmfdata_test_2"
)

TEST_DATA_2_QUERY = """
SELECT *
FROM neilpquinn.wmfdata_test_2
"""

# Hive

In [None]:
test_data_1_via_hive = wmf.hive.run_cli(TEST_DATA_1_QUERY)
assert_dataframes_match(TEST_DATA_1, test_data_1_via_hive)

In [None]:
test_data_2_via_hive = wmf.hive.run_cli(TEST_DATA_2_QUERY)
assert_dataframes_match(TEST_DATA_2, test_data_2_via_hive)

In [None]:
# Any empty pandas data frame
empty_output = wmf.hive.run_cli(COMMANDS_WITH_EMPTY_OUTPUT)
assert empty_output.empty

# Spark

In [None]:
test_data_1_via_spark = wmf.spark.run(TEST_DATA_1_QUERY)
assert_dataframes_match(TEST_DATA_1, test_data_1_via_spark)

# MariaDB

In [None]:
test_data_tuples = TEST_DATA_1.itertuples(index=False, name=None)
test_data_1_records = ",\n".join(
  [str(t) for t in test_data_tuples]
)

CREATE_TABLE_SQL = """
CREATE OR REPLACE TABLE wmfdata_test_4 (
  month VARCHAR(255),
  wiki VARCHAR(255), 
  user_id BIGINT,
  user_name VARCHAR(255), 
  edits BIGINT,
  content_edits BIGINT,
  user_registration VARCHAR(255)
  )
"""

INSERT_RECORDS_SQL = f"""
INSERT INTO
wmfdata_test_4 
VALUES
{test_data_1_records}
"""

wmf.mariadb.run(
  [CREATE_TABLE_SQL, INSERT_RECORDS_SQL], 
  dbs=["staging"]
)

In [None]:
test_data_1_via_mariadb = wmf.mariadb.run(
  """
  SELECT *
  FROM 
  wmfdata_test_4
  """, 
  "staging"
)

assert_dataframes_match(TEST_DATA_1, test_data_1_via_mariadb)