# Online Data Science Courses. Preprocessing pipeline

## Data Science courses scraped from the popular online educational platforms


### Context

Nowadays, online educational platforms provide a vast amount of online courses. For self-learning beginners in Data Science, sometimes it's hard to choose an online course to start. This data was collected with the intent to answer common questions when choosing a new study.


### Acknowledgements

Data was collected via web scraping from popular online platforms: [Coursera](https://www.coursera.org), [Stepik](https://stepik.org), [Udemy](https://www.udemy.com), [edX](https://www.edx.org), [Pluralsight](https://www.pluralsight.com), [Alison](https://alison.com), [FutureLearn](https://www.futurelearn.com), and [Skillshare](https://www.skillshare.com). From each platform were queried courses only related to the "Data Science" topic. The original author of the [image thumbnail](https://unsplash.com/photos/Im7lZjxeLhg) is [Ales Nesetril](https://unsplash.com/@alesnesetril).


### Inspiration

The primary intent behind collecting courses data is to discover which online platform provides the highest educational quality. Also, further analysis should reveal answers like "Does a paid course provide higher quality than a free one?" or "Which platform is the most suitable for beginners?".

### Pipeline rules

 1. Replace `None` in `Level` as `Mixed`, `All Levels` as `Mixed`
 2. Valid level values are: `Advanced`, `Mixed`, `Intermediate`, `Beginner`
 3. Merge multiple authors with ' ' symbol as a single column
 4. Exclude 'Description' column (for V1)
 5. Set `Votes_count` as integer without `,` symbol
 6. Set `Student_count` as integer without `,` symbol
 7. Replace `Introducionary` level with `Beginner` level
 8. Remove `\n\r\t` symbols from `Title` column
 9. Normalize `Duration` column to the hour count:
     - Raw `Skillshare` value is seconds
 

### Scraped URLs

 - [Coursera](https://www.coursera.org/search?page=1&index=prod_all_launched_products_term_optimization&topic=Data%20Science)
 - [Stepik](https://stepik.org/catalog/search?lang=en&page=1&q=Data%20Science)
 - [Udemy](https://www.udemy.com/courses/development/data-science/?p=1)
 - [edX](https://www.edx.org/search?language=English&subject=Data%20Analysis%20%26%20Statistics&tab=course)
 - [Pluralsight](https://www.pluralsight.com/search?q=Data%20Science&categories=course)
 - [Alison](https://alison.com/tag/data-science?page=1)
 - [FutureLearn](https://www.futurelearn.com/subjects/science-engineering-and-maths-courses/data-science)
 - [Skillshare](https://www.skillshare.com/search?query=Data%20Science)
 - [LinkedIn](https://www.linkedin.com/learning/topics/data-science)



## Environment preparation

Creating Spark session

In [None]:
!spark-shell --version 2>&1 | sed -n '8,15'p

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import *

spark = SparkSession.builder \
    .appName("Online Data Science Courses")\
    .master("local[*]")\
    .config("spark.driver.memory","40G")\
    .config("spark.driver.maxResultSize", "0")\
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .getOrCreate()

spark

## Coursera pipeline preprocessing

In [None]:
schema = StructType()\
            .add("title", StringType(), True)\
            .add("description", StringType(), True)\
            .add("authors", ArrayType(StringType()), True)\
            .add("rating", DoubleType(), True)\
            .add("votes_count", StringType(), True)\
            .add("students_count", StringType(), True)\
            .add("level", StringType(), True)\
            .add("duration", StringType(), True)\
            .add("platform", StringType(), True)\
            .add("free", BooleanType(), True)

coursera_dataset = spark.read.format("json")\
            .schema(schema)\
            .load("../input/coursera.json")

coursera_dataset.toPandas().head()

In [None]:
coursera_dataset.toPandas().count()

In [None]:
import re


re_digits_in_words = re.compile(r'\b(\w)*(\d)(\w)*\b')
re_redundant_spaces = re.compile(r'[ ]+')


def preprocessing_text_pipeline(text: str) -> str:
    if text is None:
        return None

    steps = [
        lambda text: text.replace('\n', ' ').replace('\r', ' ').replace('\t', ' '),
        # Remove words that contain digits
        lambda text: re_digits_in_words.sub('', text),
        # Remove redundant spaces between each word
        lambda text: re_redundant_spaces.sub(' ', text),
        # Strip string
        lambda text: text.strip()
    ]
    
    for step in steps:
        text = step(text)
        
    return text


def normalize_student_count(student_count: str) -> int:
    if student_count is None:
        return None
    
    base_count = float(student_count[:-1])
    multiplyer_count = student_count[-1]
    
    if multiplyer_count == 'm':
        return int(base_count * 1000000)
    elif multiplyer_count == 'k':
        return int(base_count * 1000)
    else:
        return int(base_count)

def normalize_votes_count(votes_count: str) -> int:
    if votes_count is None:
        return None
    
    valid_votes_count = votes_count[1:-1]
    valid_votes_count = valid_votes_count.replace(',', '')
    return int(valid_votes_count)


def normalize_authors(authors: list) -> str:
    if authors is None or len(authors) == 0:
        return None
    
    return authors[0]


def normalize_duration(duration: str) -> float:
    if duration is None or "Approx" not in duration:
        return None
    
    weeks_and_hours = re.findall(r'\d+', duration)
    
    if len(weeks_and_hours) > 1:
        return int(weeks_and_hours[0]) * 4 * int(weeks_and_hours[1]) * 1.0
    else:
        return int(weeks_and_hours[0]) * 1.0


preprocessing_text_pipeline_udf = udf(lambda x: preprocessing_text_pipeline(x), StringType())
normalize_authors_udf = udf(lambda x: normalize_authors(x), StringType())
normalize_student_count_udf = udf(lambda x: normalize_student_count(x), IntegerType())
normalize_votes_count_udf = udf(lambda x: normalize_votes_count(x), IntegerType())
normalize_duration_udf = udf(lambda x: normalize_duration(x), DoubleType())
    
coursera_dataset_normalize = coursera_dataset.select(
    preprocessing_text_pipeline_udf(col("title")).alias("title"),
    normalize_authors_udf(col("authors")).alias("author"),
    col("rating"),
    normalize_votes_count_udf(col("votes_count")).alias("votes_count"),
    normalize_student_count_udf(col("students_count")).alias("students_count"),
    col("level"),
    normalize_duration_udf(col("duration")).alias("duration"),
    col("platform"),
    col("free")
)

coursera_dataset_normalize.toPandas().head()

In [None]:
coursera_dataset_normalize.select(col("duration")).distinct().toPandas().hist(bins=24, alpha=0.5)

## Stepik pipeline preprocessing

In [None]:
schema = StructType()\
            .add("title", StringType(), True)\
            .add("authors", ArrayType(StringType()), True)\
            .add("rating", DoubleType(), True)\
            .add("votes_count", DoubleType(), True)\
            .add("students_count", IntegerType(), True)\
            .add("level", StringType(), True)\
            .add("duration", DoubleType(), True)\
            .add("platform", StringType(), True)\
            .add("free", BooleanType(), True)

stepik_dataset = spark.read.format("json")\
            .schema(schema)\
            .load("../input/stepik.json")

stepik_dataset.toPandas().head()

In [None]:
stepik_dataset.toPandas().count()

In [None]:
stepik_dataset_normalize = stepik_dataset.select(
    preprocessing_text_pipeline_udf(col("title")).alias("title"),
    normalize_authors_udf(col("authors")).alias("author"),
    col("rating"),
    col("votes_count"),
    col("students_count"),
    col("level"),
    col("duration"),
    col("platform"),
    col("free")
)

stepik_dataset_normalize.toPandas().head()

## Edx pipeline preprocessing

In [None]:
schema = StructType()\
            .add("title", StringType(), True)\
            .add("description", StringType(), True)\
            .add("authors", ArrayType(StringType()), True)\
            .add("rating", DoubleType(), True)\
            .add("votes_count", IntegerType(), True)\
            .add("students_count", StringType(), True)\
            .add("level", StringType(), True)\
            .add("duration", StringType(), True)\
            .add("platform", StringType(), True)\
            .add("free", BooleanType(), True)

edx_dataset = spark.read.format("json")\
            .schema(schema)\
            .load("../input/edx.json")

edx_dataset.toPandas().head()

In [None]:
edx_dataset.toPandas().count()

In [None]:
import numpy as np


def normalize_authors(authors: list) -> str:
    if authors is None or len(authors) == 0:
        return None
    
    return authors[0].replace("Institutions: ", "").replace("Institution: ", "")


def normalize_students_count(students_count: str) -> int:
    if students_count is None:
        return None
    
    students = re.findall(r'\d+', students_count)

    if len(students) == 0:
        return None
    
    return int("".join(students))


def normalize_level(level: str) -> str:
    level = level.replace("Level: ", "")
    
    if level == "Introductory":
        return "Beginner"
    else:
        return level
    
    
def normalize_duration(duration: str) -> float:
    if '\n' in duration:    
        weeks_and_hours = duration.split('\n')
        weeks, hours = weeks_and_hours[0], weeks_and_hours[1]
        
        weeks_norm = int(re.findall(r'\d+', weeks)[0])
        hours_norm = np.sum([float(int(n)) for n in re.findall(r'\d+', hours)]) / 2.0
        
        return weeks_norm * 4.0  
    else:
        return int(re.findall(r'\d+', duration)[0]) * 1.0
    

normalize_authors_udf = udf(lambda x: normalize_authors(x), StringType())
normalize_level_udf = udf(lambda x: normalize_level(x), StringType())
normalize_students_count_udf = udf(lambda x: normalize_students_count(x), IntegerType())
normalize_duration_udf = udf(lambda x: normalize_duration(x), DoubleType())

edx_dataset_normalize = edx_dataset.select(
    preprocessing_text_pipeline_udf(col("title")).alias("title"),
    normalize_authors_udf(col("authors")).alias("author"),
    col("rating"),
    col("votes_count"),
    normalize_students_count_udf(col("students_count")).alias("students_count"),
    normalize_level_udf(col("level")).alias("level"),
    normalize_duration_udf(col("duration")).alias("duration"),
    col("platform"),
    col("free")
)

edx_dataset_normalize.toPandas().head()

In [None]:
edx_dataset_normalize.select(col("duration")).distinct().collect()

## Pluralsight pipeline preprocessing

In [None]:
schema = StructType()\
            .add("title", StringType(), True)\
            .add("description", StringType(), True)\
            .add("authors", ArrayType(StringType()), True)\
            .add("rating", StringType(), True)\
            .add("votes_count", StringType(), True)\
            .add("students_count", StringType(), True)\
            .add("level", StringType(), True)\
            .add("duration", StringType(), True)\
            .add("platform", StringType(), True)\
            .add("free", BooleanType(), True)

pluralsight_dataset = spark.read.format("json")\
            .schema(schema)\
            .load("../input/pluralsight.json")

pluralsight_dataset.toPandas().head()

In [None]:
def normalize_authors(authors: list) -> str:
    if authors is None or len(authors) == 0:
        return None
    
    return authors[0]


def normalize_votes_count(votes_count: str) -> int:
    if votes_count is None:
        return None
    
    valid_votes_count = votes_count[1:-1]
    valid_votes_count = valid_votes_count.replace(',', '')
    return int(valid_votes_count)


def normalize_rating(rating: str) -> float:
    if len(rating) == 0:
        return None
    else:
        fa_stars = rating.split(';')
        total_rating = 0.0
        
        for fa_star in fa_stars:
            if fa_star == 'fa fa-star':
                total_rating += 1.0
            elif fa_star == 'fa fa-star-half-o':
                total_rating += 0.5
        
        return total_rating
    
    
def normalize_duration(duration: str) -> float:
    decimals = re.findall(r'\d+', duration)
    
    if len(decimals) == 2:
        final_decimal = float(np.round(float(int(decimals[0])) + float(int(decimals[1])) / 60.0, decimals=1))
        return final_decimal if final_decimal != 0.0 else 0.1
    elif len(decimals) == 1:
        final_decimal = float(np.round(float(int(decimals[0])) / 60.0, decimals=1))
        return final_decimal if final_decimal != 0.0 else 0.1
    else:
        return None


normalize_authors_udf = udf(lambda x: normalize_authors(x), StringType())
normalize_votes_count_udf = udf(lambda x: normalize_votes_count(x), IntegerType())
normalize_rating_udf = udf(lambda x: normalize_rating(x), DoubleType())
normalize_duration_udf = udf(lambda x: normalize_duration(x), DoubleType())


pluralsight_dataset_normalize = pluralsight_dataset.select(
    preprocessing_text_pipeline_udf(col("title")).alias("title"),
    normalize_authors_udf(col("authors")).alias("author"),
    normalize_rating_udf(col("rating")).alias("rating"),
    normalize_votes_count_udf(col("votes_count")).alias("votes_count"),
    col("students_count"),
    col("level"),
    normalize_duration_udf(col("duration")).alias("duration"),
    col("platform"),
    col("free")
)

pluralsight_dataset_normalize.toPandas().head()

In [None]:
pluralsight_dataset_normalize.select(col("duration")).distinct().collect()

## Alison pipeline preprocessing

In [None]:
schema = StructType()\
            .add("title", StringType(), True)\
            .add("description", StringType(), True)\
            .add("authors", ArrayType(StringType()), True)\
            .add("rating", StringType(), True)\
            .add("votes_count", StringType(), True)\
            .add("students_count", StringType(), True)\
            .add("level", StringType(), True)\
            .add("duration", StringType(), True)\
            .add("platform", StringType(), True)\
            .add("free", BooleanType(), True)

alison_dataset = spark.read.format("json")\
            .schema(schema)\
            .load("../input/alison.json")

alison_dataset.toPandas().head()

In [None]:
def normalize_authors(authors: list) -> str:
    if authors is None or len(authors) == 0:
        return None
    
    return authors[0]


def normalize_rating(rating: str) -> float:
    return None


def normalize_students_count(students_count: str) -> int:
    return int(students_count.replace(",", ""))


def normalize_duration(duration: str) -> float:
    return float(np.round(np.sum([float(n) for n in re.findall(r'[\d.]+', duration)]) / 2.0, decimals=1))


def normalize_level(level: str) -> str:
    if level is None:
        return "Mixed"
    

normalize_authors_udf = udf(lambda x: normalize_authors(x), StringType())
normalize_level_udf = udf(lambda x: normalize_level(x), StringType())
normalize_rating_udf = udf(lambda x: normalize_rating(x), DoubleType())
normalize_duration_udf = udf(lambda x: normalize_duration(x), DoubleType())
normalize_students_count_udf = udf(lambda x: normalize_students_count(x), IntegerType())

alison_dataset_normalize = alison_dataset.select(
    preprocessing_text_pipeline_udf(col("title")).alias("title"),
    normalize_authors_udf(col("authors")).alias("author"),
    normalize_rating_udf(col("rating")).alias("rating"),
    col("votes_count"),
    normalize_students_count_udf(col("students_count")).alias("students_count"),
    normalize_level_udf(col("level")).alias("level"),
    normalize_duration_udf(col("duration")).alias("duration"),
    col("platform"),
    col("free")
)

alison_dataset_normalize.toPandas().head()

In [None]:
alison_dataset_normalize.select(col("level")).distinct().collect()

## Udemy pipeline preprocessing

In [None]:
schema = StructType()\
            .add("title", StringType(), True)\
            .add("description", StringType(), True)\
            .add("authors", ArrayType(StringType()), True)\
            .add("rating", StringType(), True)\
            .add("votes_count", StringType(), True)\
            .add("students_count", StringType(), True)\
            .add("level", StringType(), True)\
            .add("duration", StringType(), True)\
            .add("platform", StringType(), True)\
            .add("free", BooleanType(), True)

udemy_dataset = spark.read.format("json")\
            .schema(schema)\
            .load("../input/udemy.json")

udemy_dataset.toPandas().head()

In [None]:
def normalize_level(level: str) -> str:
    level = level.replace("Level: ", "")
    
    if level == "Expert":
        return "Advanced"
    elif level == "All Levels":
        return "Mixed"
    else:
        return level


def normalize_votes_count(votes_count: str) -> int:
    return int(re.search(r'\d+', votes_count.replace(",", "")).group(0))


def normalize_students_count(students_count: str) -> int:
    return int(re.search(r'\d+', students_count.strip("\n").replace(",", "")).group(0))


def normalize_duration(duration: str) -> float:
    if " total hours" in duration:
        return float(duration.replace(" total hours", ""))
    elif " total hour" in duration:
        return float(duration.replace(" total hour", ""))
    elif " total mins" in duration:
        return float(np.round(float(duration.replace(" total mins", "")) / 60.0, decimals=1))
    else:
        return None


normalize_rating_udf = udf(lambda x: float(x), DoubleType())
normalize_level_udf = udf(lambda x: normalize_level(x), StringType())
normalize_votes_count_udf = udf(lambda x: normalize_votes_count(x), IntegerType())
normalize_students_count_udf = udf(lambda x: normalize_students_count(x), IntegerType())
normalize_duration_udf = udf(lambda x: normalize_duration(x), DoubleType())

udemy_dataset_normalize = udemy_dataset.select(
    preprocessing_text_pipeline_udf(col("title")).alias("title"),
    normalize_authors_udf(col("authors")).alias("author"),
    normalize_rating_udf(col("rating")).alias("rating"),
    normalize_votes_count_udf(col("votes_count")).alias("votes_count"),
    normalize_students_count_udf(col("students_count")).alias("students_count"),
    normalize_level_udf(col("level")).alias("level"),
    normalize_duration_udf(col("duration")).alias("duration"),
    col("platform"),
    col("free")
)

udemy_dataset_normalize.toPandas().head()

In [None]:
udemy_dataset_normalize.select(col("rating")).distinct().collect()

## Skillshare pipeline preprocessing

In [None]:
schema = StructType()\
            .add("title", StringType(), True)\
            .add("description", StringType(), True)\
            .add("authors", ArrayType(StringType()), True)\
            .add("rating", StringType(), True)\
            .add("votes_count", StringType(), True)\
            .add("students_count", StringType(), True)\
            .add("level", StringType(), True)\
            .add("duration", StringType(), True)\
            .add("platform", StringType(), True)\
            .add("free", BooleanType(), True)

skillshare_dataset = spark.read.format("json")\
            .schema(schema)\
            .load("../input/skillshare.json")

skillshare_dataset.toPandas().head()

In [None]:
import ast


def normalize_level(level: str) -> str:
    correction = {
        "Advanced level": "Advanced",
        "Intermediate level": "Intermediate",
        "Beginner level": "Beginner",
        "All levels": "Mixed",
        "Beg/Int level": "Mixed",
        "--": "Mixed"
    }
    
    if level is None:
        return "Mixed"
    else:
        return correction[level]


def normalize_votes_count(votes_count: str) -> int:
    votes_int = []
    votes = votes_count[1:-1].split(",")
    
    for vote in votes:
        vote_cleaned = re.search(r"\d+", vote)
        votes_int.append(0 if vote_cleaned is None else int(vote_cleaned.group(0)))
    
    return int(np.sum(votes_int))


def normalize_rating(rating: str) -> float:
    rating_int = []
    rating = ast.literal_eval(rating)
    
    for mark, ratio in rating.items():
        int_ratio = ratio.replace("%", "")
        ratio = float(int_ratio) / 100.0 if len(int_ratio) > 0 else 0.0
        rating_int.append(int(mark) * ratio)
    
    final_rating = float(np.round(np.sum(rating_int), decimals=1))
    return final_rating
        
    
normalize_rating_udf = udf(lambda x: normalize_rating(x), DoubleType())
normalize_level_udf = udf(lambda x: normalize_level(x), StringType())
normalize_students_count_udf = udf(lambda x: int(x), IntegerType())
normalize_duration_udf = udf(lambda x: float(x) / 3600.0, DoubleType())
normalize_votes_count_udf = udf(lambda x: normalize_votes_count(x), IntegerType())

skillshare_dataset_normalize = skillshare_dataset.select(
    preprocessing_text_pipeline_udf(col("title")).alias("title"),
    normalize_authors_udf(col("authors")).alias("author"),
    normalize_rating_udf(col("rating")).alias("rating"),
    normalize_votes_count_udf(col("votes_count")).alias("votes_count"),
    normalize_students_count_udf(col("students_count")).alias("students_count"),
    normalize_level_udf(col("level")).alias("level"),
    normalize_duration_udf(col("duration")).alias("duration"),
    col("platform"),
    col("free")
)

skillshare_dataset_normalize.toPandas().head()

## Futurelearn pipeline preprocessing

In [None]:
schema = StructType()\
            .add("title", StringType(), True)\
            .add("description", StringType(), True)\
            .add("authors", ArrayType(StringType()), True)\
            .add("rating", StringType(), True)\
            .add("votes_count", StringType(), True)\
            .add("students_count", StringType(), True)\
            .add("level", StringType(), True)\
            .add("duration", StringType(), True)\
            .add("platform", StringType(), True)\
            .add("free", BooleanType(), True)

futurelearn_dataset = spark.read.format("json")\
            .schema(schema)\
            .load("../input/futurelearn.json")

futurelearn_dataset.toPandas().head()

In [None]:
def normalize_votes_count(votes_count: str) -> int:
    if votes_count is None:
        return None

    return int(re.search(r'\d+', votes_count).group(0))


def normalize_students_count(students_count: str) -> int:
    if students_count is None:
        return None

    return int(re.search(r'\d+', students_count.replace(",", "")).group(0))


def normalize_duration(duration: str) -> float:
    weeks, hours = tuple(duration.split(";"))

    return float(re.search(r'\d+', weeks).group(0)) * float(re.search(r'\d+', hours).group(0))


def normalize_rating(rating: str) -> float:
    if rating is None:
        return None

    return float(rating[:3])


normalize_rating_udf = udf(lambda x: normalize_rating(x), DoubleType())
normalize_votes_count_udf = udf(lambda x: normalize_votes_count(x), IntegerType())
normalize_students_count_udf = udf(lambda x: normalize_students_count(x), IntegerType())
normalize_duration_udf = udf(lambda x: normalize_duration(x), DoubleType())

futurelearn_dataset_normalize = futurelearn_dataset.select(
    preprocessing_text_pipeline_udf(col("title")).alias("title"),
    preprocessing_text_pipeline_udf(normalize_authors_udf(col("authors"))).alias("author"),
    normalize_rating_udf(col("rating")).alias("rating"),
    normalize_votes_count_udf(col("votes_count")).alias("votes_count"),
    normalize_students_count_udf(col("students_count")).alias("students_count"),
    col("level"),
    normalize_duration_udf(col("duration")).alias("duration"),
    col("platform"),
    col("free")
)

futurelearn_dataset_normalize.toPandas().head()

## Final dataframe merge

In [None]:
final_df = coursera_dataset_normalize\
                .union(stepik_dataset_normalize)\
                .union(edx_dataset_normalize)\
                .union(udemy_dataset_normalize)\
                .union(pluralsight_dataset_normalize)\
                .union(alison_dataset_normalize)\
                .union(skillshare_dataset_normalize)\
                .union(futurelearn_dataset_normalize)

final_df = final_df.orderBy(rand())
final_df.toPandas().head()

In [None]:
final_df.count()

In [None]:
final_df.select(col("platform")).distinct().collect()

In [None]:
final_df.distinct().count()

In [None]:
final_df.distinct().toPandas().to_csv('output/dataframe.csv', header=coursera_dataset_normalize.columns)