<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Scraping-linkedin-Posts" data-toc-modified-id="Scraping-linkedin-Posts-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Scraping linkedin Posts</a></span><ul class="toc-item"><li><span><a href="#Utils" data-toc-modified-id="Utils-1.1"><span class="toc-item-num">1.1&nbsp;&nbsp;</span>Utils</a></span></li><li><span><a href="#Login" data-toc-modified-id="Login-1.2"><span class="toc-item-num">1.2&nbsp;&nbsp;</span>Login</a></span></li><li><span><a href="#Load-posts-page-&amp;-scroll-to-bottom" data-toc-modified-id="Load-posts-page-&amp;-scroll-to-bottom-1.3"><span class="toc-item-num">1.3&nbsp;&nbsp;</span>Load posts page &amp; scroll to bottom</a></span></li><li><span><a href="#Retrieve-data-from-loaded-page" data-toc-modified-id="Retrieve-data-from-loaded-page-1.4"><span class="toc-item-num">1.4&nbsp;&nbsp;</span>Retrieve data from loaded page</a></span></li><li><span><a href="#Saving-blog-posts-to-files" data-toc-modified-id="Saving-blog-posts-to-files-1.5"><span class="toc-item-num">1.5&nbsp;&nbsp;</span>Saving blog posts to files</a></span></li></ul></li></ul></div>

# Scraping linkedin Posts

In [1]:
try:
    from selenium import webdriver
except:
    %pip install selenium
    from selenium import webdriver

try:
    import unidecode
except:
    %pip install unidecode
    import unidecode

try:
    import pandas as pd
except:
    %pip install pandas
    import pandas as pd

In [2]:
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities

from bs4 import BeautifulSoup as bs
import time
from datetime import datetime, timedelta
import re as re
from importlib.metadata import version
from typing import Any, Dict, Optional
import os
import types
import logging
import tkinter as tk
import json
import hashlib

In [3]:
PAGE = 'https://www.linkedin.com/company/mgm-technology-partners-gmbh'
SCROLL_PAUSE_TIME = 1.5
DATA_DIRECTORY = os.getenv('DATA_DIRECTORY') or 'data'
os.makedirs(DATA_DIRECTORY, exist_ok=True)

BLOGS_DIRECTORY = os.getenv('BLOGS_DIRECTORY') or f"{DATA_DIRECTORY}/blogs"
os.makedirs(BLOGS_DIRECTORY, exist_ok=True)

TMP_DIRECTORY = os.getenv('TMP_DIRECTORY') or f"{DATA_DIRECTORY}/tmp_linkedin"
os.makedirs(TMP_DIRECTORY, exist_ok=True)

FILENAME_SOUP = "linkedin_soup.html"
INTERNAL_DATE_FORMAT = "%Y-%m-%d"
NO_DATE = "__no_date__"

FILENAME_RAW_POSTS = f"{TMP_DIRECTORY}/raw_posts.json"

SELENIUM_RUNNER = 'http://selenium:4444'
GLOBAL_BROWSER = None # We need to declare this global variable, will set it later

In [5]:
try:
    f= open("credentials.txt","r")
    contents = f.read()
    username = contents.replace("=",",").split(",")[1]
    password = contents.replace("=",",").split(",")[3]
except:
    f= open("credentials.txt","w+")
    username = input('Enter your linkedin username: ')
    password = input('Enter your linkedin password: ')
    f.write("username={}, password={}".format(username,password))
    f.close()

## Utils

In [6]:
def get_logger(name, log_level=logging.WARN):
    # Get a logger with the given name
    logger = logging.getLogger(name)
    logger.propagate = False  # Disable propagation to the root logger. Makes sense in Jupyter only...
    logger.setLevel(log_level)

    # Check if the logger has handlers already
    if not logger.handlers:
        # Create a handler
        handler = logging.StreamHandler()

        # Set a format that includes the logger's name
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        logger.addHandler(handler)

    return logger
    
def transformDate2String(dateToTransform: datetime) -> str:
    logger = get_logger(transformDate2String.__name__)
    try:
        dateStr = dateToTransform.strftime(INTERNAL_DATE_FORMAT)
    except:
        logger.error(f"Error transforming date: {dateToTransform}. Continuing with empty date string.")
        dateStr = ""
    return dateStr

def transformString2Date(stringToTransform: str) -> Optional[datetime]:
    """Transforms a String that holds a date in my standard format to a Date. 
        In case it can't transform it, it return None."""
    try:
        dateObj = datetime.strptime(stringToTransform, INTERNAL_DATE_FORMAT)
    except:
        log("transformString2Date", "Error transforming string to date: ",
            stringToTransform)
        dateObj = None
    return dateObj

def getNowAsString() -> str:
    return transformDate2String(datetime.now())

def getMinDateAsString() -> str:
    return transformDate2String(datetime(1970, 1, 1))

def stripBlanks(str):
    return str.strip(" \t")
import logging

In [7]:
def writeDictToFile(*, dictionary: Dict, fullFilename: str) -> Dict:
    """Writes a dictionary to a file. Also updates the _stats element."""
    logger = get_logger(writeDictToFile.__name__, logging.INFO)
    if not isinstance(dictionary, dict):
        raise TypeError("Expected a dictionary, but got a " + str(type(dictionary)))
    #log("writeDictToFile", "Len of dict to write: ", len(dictionary), " type: ", type(dictionary))
    nowStr = getNowAsString()
    dictionary.setdefault("_stats", {"lastWritten": nowStr})
    dictionary["_stats"]["lastWritten"] = nowStr
    dictionary["_stats"]["counter"] = len(dictionary)-1
    stats = dictionary["_stats"]
    del dictionary["_stats"]
    #log("writeDictToFile", "Len of dict after deleting _stats: ", len(dictionary), " type: ", type(dictionary))
    dictionary = dict(sorted(dictionary.items()))
    #log("writeDictToFile", "Len of dict after sorting: ", len(dictionary), " type: ", type(dictionary))
    sortedDictionary = {"_stats": stats, **dictionary}
    #log("writeDictToFile", "Len of sorted dict to write: ", len(sortedDictionary), " type: ", type(dictionary))
    dictDump = json.dumps(sortedDictionary, sort_keys=False, indent=2)

    # Make sure that the directory in which we want to write exists.
    directory = os.path.dirname(os.path.abspath(fullFilename))
    #log('writeDictToFile', 'Writing to dir ', directory)
    try:
        os.makedirs(directory)
    except FileExistsError:
        # directory already exists, so no need to create it - all good
        pass

    with open(fullFilename, 'w') as file:
        file.write(dictDump)
    return sortedDictionary

def readDictFromFile(*, fullFilename: str) -> Dict:
    """Reads a dictionary from a file. Chacks that the dictionary read has a _stats.lastWritten entry."""
    logger = get_logger(readDictFromFile.__name__, logging.INFO)
    data = {}
    try:
        with open(fullFilename, "r+") as file:
            data = json.load(file)
            if data == None: 
                return {}
            if data.get("_stats", {}).get("lastWritten") == None:
                logger.warning(f"Read file {fullFilename} successfully but does not contain _stats.lastWritten.")
            return data
    except IOError as e:
        logger.warning(f"Could not open file {fullFilename}")
        raise e
    return data

def test_writeDictToFile():
    data = {
        "hello": "world",
        "now": "what"
    }
    writeDictToFile(dictionary=data, fullFilename="test.json")

def test_readDictFromFile():
    data = {
        "hello": "world",
        "now": "what"
    }
    FILENAME = "test.json"

    # Write and then read it
    writeDictToFile(dictionary=data, fullFilename=FILENAME)
    data2 = readDictFromFile(fullFilename=FILENAME)

    # Delete test data, try to read it - even though it doesn't exist
    try:
        os.remove(FILENAME)
    except FileNotFoundError:
        pass
    try:
        data3 = readDictFromFile(fullFilename=FILENAME)
    except:
        print("All good, I am in an exception as I expected it to be")

test_readDictFromFile()



All good, I am in an exception as I expected it to be


## Login

In [8]:
def create_loggedin_browser():
    logger = get_logger(create_loggedin_browser.__name__, logging.INFO)

    #access Webriver
    chrome_options = webdriver.ChromeOptions()
    chrome_options.add_argument("--incognito")
    #browser = webdriver.Chrome(options=chrome_options)

    logger.info('Requesting remote browser/driver...')
    browser = webdriver.Remote(SELENIUM_RUNNER, options=chrome_options)
    logger.info('Received remote browser/driver 😜')
    
    #Open login page
    browser.get('https://www.linkedin.com/login?fromSignIn=true&trk=guest_homepage-basic_nav-header-signin')
    
    #Enter login info:
    elementID = browser.find_element(by=By.ID, value='username')   #.find_element_by_id('username')
    elementID.send_keys(username)
    
    elementID = browser.find_element(by=By.ID, value='password')#find_element_by_id('password')
    elementID.send_keys(password)
    #Note: replace the keys "username" and "password" with your LinkedIn login info
    elementID.submit()
    
    return browser

Note: To see the running browser/driver sessions on the selenium runner service, go [here](http://localhost:4444/ui#/sessions)

In [33]:
def get_loggedin_browser():
    logger = get_logger(login_global_browser.__name__, logging.INFO)
    global GLOBAL_BROWSER # We need to explicitly declare that we mean the gllobal var here...

    if GLOBAL_BROWSER is not None:
        logger.info('Quitting existing browser/driver session')
        try:
            GLOBAL_BROWSER.quit()
        except:
            logger.warn('Failed quitting existing browser/driver. Ignoring, trying to create a new one anyways.')
    GLOBAL_BROWSER = get_loggedin_browser()

login_global_browser()

2023-10-29 08:49:12,566 - login_global_browser - INFO - Quitting existing browser/driver session
2023-10-29 08:49:12,665 - get_loggedin_browser - INFO - Requesting remote browser/driver...
2023-10-29 08:49:13,467 - get_loggedin_browser - INFO - Received remote browser/driver 😜


## Load posts page & scroll to bottom

In [34]:
def browser_go_to_page(browser, max_pages=0):
    logger = get_logger(browser_go_to_page.__name__, logging.INFO)
    #Go to webpage
    company_posts_page = PAGE + '/posts/'
    logger.info(f"{company_posts_page=}")
    browser.get(company_posts_page)
    
    # Get scroll height
    last_height = browser.execute_script("return document.body.scrollHeight")
    scroll_page = 0
    
    while True:
        # Scroll down to bottom
        #click_visible_menues(browser)
        browser.execute_script("window.scrollTo(0, document.body.scrollHeight);")
    
        scroll_page += 1
        logger.info(f"Scrolling page {scroll_page}")
        
        # Wait to load page
        time.sleep(SCROLL_PAUSE_TIME)
    
        # Calculate new scroll height and compare with last scroll height
        new_height = browser.execute_script("return document.body.scrollHeight")
        if new_height == last_height:
            break
        last_height = new_height
        if max_pages > 0:
            if scroll_page == max_pages:
                break
                
    return 

def get_page_source(browser, max_pages=0):
    logger = get_logger(get_page_source.__name__, logging.INFO)
    browser_go_to_page(browser, max_pages)

    company_page = browser.page_source   
    return company_page

In [35]:
def get_linkedin_browser(max_pages=0):
    browser = get_loggedin_browser()
    browser_go_to_page(browser, max_pages=max_pages)
    return browser

## Retrieve data from loaded page

In [36]:
def retrieve_container_elements(max_pages):
    logger = get_logger(retrieve_container_elements.__name__, logging.INFO)
    browser = get_linkedin_browser(max_pages=max_pages)
    container_elements = browser.find_elements(By.CLASS_NAME, "occludable-update")
    logger.info(f"No of container elements before filter: {len(container_elements)}")
    container_elements = [element for element in container_elements if len(element.find_elements(By.CLASS_NAME,"update-components-actor")) > 0]
    logger.info(f"No of container elements after filter: {len(container_elements)}")
    return container_elements, browser

In [37]:
def is_element_in_viewport(driver, element):
    return driver.execute_script("""
        var elem = arguments[0];
        var rect = elem.getBoundingClientRect();
        return (
            rect.top >= 0 &&
            rect.left >= 0 &&
            rect.bottom <= (window.innerHeight || document.documentElement.clientHeight) &&
            rect.right <= (window.innerWidth || document.documentElement.clientWidth)
        );
    """, element)

In [38]:
def get_post_url(browser):
    logger = get_logger(get_post_url.__name__, logging.WARN)
    elements = browser.find_elements(By.XPATH, "//*[text()='Copy link to post']")
    if len(elements) != 1:
        logger.warning(f"Number of list of elements that should give me the URL of the blogpost: {len(elements)}")
        return None
    try:
        elements[0].click()
        root = tk.Tk()
        blog_post_url = root.clipboard_get()
        logger.info(f"URL of blog post: {blog_post_url}")
        return blog_post_url
    except Exception as e:
        logger.warn(f"Could not extract blog post url, retrurning None. Error: {e}")
        return None

def extract_blog_post_url_from_container_element(browser, container_element):
    logger = get_logger(extract_blog_post_url_from_container_element.__name__, logging.INFO)
    #logger.info(f"Extracting from container of type {type(container_element)}")
    buttons = container_element.find_elements(By.CLASS_NAME, 'feed-shared-control-menu__trigger')  
    if len(buttons) != 1:
        logger.info(f"No of buttons found in container: {len(buttons)}. Cannot process this container.")
        return None
        
    button = buttons[0]
    actions = ActionChains(browser)
    actions.send_keys(Keys.ESCAPE).perform()
    browser.execute_script('arguments[0].scrollIntoView({ behavior: "smooth", block: "center", inline: "nearest" });', button)
    
    if not button.is_displayed():  
        logger.warn("Button not displayed, cannot process container")
        return None
        
    actions.send_keys(Keys.ESCAPE).perform()
    time.sleep(1)  
    button.click()
    time.sleep(5)  
    url = get_post_url(browser)                
    actions.send_keys(Keys.ESCAPE).perform()
    return url

In [39]:
def write_blog_containers_to_file(blogs):
    logger = get_logger(write_blog_containers_to_file.__name__, logging.INFO)
    # Prepare blogs to be saveable i.e. serializable
    blogs_to_save = {}
    for blog_id, blog in blogs:
        blog_to_save = blog
        blog_to_save["soup"] = blog["soup"].prettify()
        blogs_to_save[blog_id] = blog_to_save
    try:
        writeDictToFile(dictionary=blogs_to_save,fullFilename=FILENAME_RAW_POSTS)
    except Exception as e:
        logger.warn(f"could not write {len(blogs)} blog containers to file {FILENAME_RAW_POSTS}: {e}")        
    return 

def read_blog_containers_from_file():
    logger = get_logger(read_blog_containers_from_file.__name__, logging.INFO)
    blogs = {}
    try:
        blogs = readDictFromFile(fullFilename=FILENAME_RAW_POSTS)
    except Exception as e:
        logger.warning(f"Could not read blog containers from file {FILENAME_RAW_POSTS}. Raising Error.")
        raise e
    for blog_id, blog in blogs:
        blog[blog_id]["soup"] = bs(blog["soup"], "html.parser")  # convert string to BeautifulSoup object
    logger.info(f"Read {len(blogs)} blog containersc from file {FILENAME_RAW_POSTS}.")
    return blogs

In [40]:
def extract_text_from_soup(soup: bs):
    logger = get_logger(extract_text_from_soup.__name__, log_level=logging.INFO)

    # In 'container', find the first <div> element with class 'feed-shared-update-v2__description-wrapper'.
    # Assign this element to 'text_box'.
    text_box = soup.find("div", {"class":"feed-shared-update-v2__description-wrapper"})
    
    # If 'text_box' is not None (i.e., if such an element was found in 'container')...
    if text_box:
        # ...find the first <span> element within 'text_box' that has the 'dir' attribute set to 'ltr'.
        # Extract its text content, strip leading and trailing whitespace, and assign this cleaned text to 'text'.
        text = text_box.find("span", {"dir":"ltr"}).text.strip()
        
        # Return 'text'.
        return text
    else:
        # If 'text_box' is None (i.e., if no such <div> element was found in 'container')...
        # ...print an error message.
        logger.warning(f"Could not extract text from soup!")
        
        # Uncomment the following line to print the 'container' for debugging purposes.
        # print(f"Container: {container}")
        
        # Return an empty string.
        return ""

In [41]:
def generate_id_from_text(blog_text):
    # Create a hash of the blog source
    hash_object = hashlib.sha256(blog_text.encode())
    hex_dig = hash_object.hexdigest()
    return hex_dig

In [42]:
def extract_blogs_from_container_elements(browser, container_elements):
    logger = get_logger(extract_blogs_from_container_elements.__name__, logging.INFO)
    blogs = {}
    for container_element in container_elements:
        blog_url = extract_blog_post_url_from_container_element(browser, container_element)
        blog_source = container_element.get_attribute('outerHTML')
        blog_soup = bs(blog_source.encode("utf-8"), "html")
        blog_text = extract_text_from_soup(blog_soup)
        if (len(blog_text) == 0):
            logger.warning(f"Cannot extract text from container, so container has no value and is skipped")
        else:
            blog_id = generate_id_from_text(blog_text)
            blog = {
                "url": blog_url,
                "source": blog_source,
                "soup": blog_soup,
                "scrape_date": getNowAsString()
            }
            blogs[blog_id] = blog
                         
    logger.info(f"No of extracted blogs: {len(blogs)}")
    write_blog_containers_to_file(blogs)
    return blogs

In [43]:
def get_blog_containers(force_retrieval=False, max_pages=0):
    logger = get_logger(get_blog_containers.__name__, logging.INFO)
    if force_retrieval:
        logger.info(f"Retrieving blog containers: {force_retrieval=} {max_pages=}")
        container_elements, browser = retrieve_container_elements(max_pages)
        blog_containers = extract_blogs_from_container_elements(browser, container_elements)   
        return blog_containers
    try:
        blog_containers = read_blog_containers_from_file()
        return blog_containers
    except Exception as e:
        logger.warning(f"Could not read blog containers from file, retrieving from website")
        container_elements, browser = retrieve_container_elements(max_pages)
        blog_containers = extract_blogs_from_container_elements(browser, container_elements)   
        return blog_containers

blog_container = get_blog_containers(force_retrieval=False, max_pages=3)

2023-10-29 08:49:37,264 - get_loggedin_browser - INFO - Requesting remote browser/driver...


SessionNotCreatedException: Message: Could not start a new session. Timed out creating session 
Host info: host: '47fac3e4c315', ip: '172.28.0.3'
Build info: version: '4.14.1', revision: '03f8ede370'
System info: os.name: 'Linux', os.arch: 'amd64', os.version: '6.4.16-linuxkit', java.version: '11.0.20.1'
Driver info: driver.version: unknown
Stacktrace:
    at org.openqa.selenium.grid.sessionqueue.local.LocalNewSessionQueue.failDueToTimeout (LocalNewSessionQueue.java:465)
    at java.lang.Iterable.forEach (Iterable.java:75)
    at org.openqa.selenium.grid.sessionqueue.local.LocalNewSessionQueue.timeoutSessions (LocalNewSessionQueue.java:181)
    at org.openqa.selenium.concurrent.GuardedRunnable.lambda$guard$0 (GuardedRunnable.java:35)
    at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:515)
    at java.util.concurrent.FutureTask.runAndReset (FutureTask.java:305)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:305)
    at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1128)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:628)
    at java.lang.Thread.run (Thread.java:829)

In [None]:
def extract_date_string_from_soup(soup: bs):
    logger = get_logger(extract_date_string_from_soup.__name__, log_level=logging.WARN)

    # Looking for the relative date (in d, w, mo, yr)
    # It has the shape: "1yr •"
    p = re.compile(r'\d{1,2}(h|d|w|mo|yr)\s•')
    m = re.compile(r'\d{1,2}(h|d|w|mo|yr)\s•').search(soup.prettify())
    dateHumanReadable = ""
    if m:
        dateHumanReadable = m.group()
        logger.info(f"Match found: {dateHumanReadable}")
        return dateHumanReadable
    else:
        logger.error(f"Could not extract human readable date from soup! soup: {soup}")
        return NO_DATE

def test_extract_date_string_from_soup():
    containers = get_blog_containers()
    human_readable_date = extract_date_string_from_soup(containers[0]["soup"])
    print(human_readable_date)

test_extract_date_string_from_soup()

In [None]:
def linkedin_rel_date2datetime(relative_date):
    """Transforms a relative date from LinkedIn to a datetime object.
    Transform "6d •" to a proper datetime"""

    logger = get_logger(linkedin_rel_date2datetime.__name__, log_level=logging.WARN)
    
    p = re.compile('\d{1,2}')
    m = p.search(relative_date)
    if m is None:
        logger.error(f"Amount not found in {relative_date}")
        exit
    amount = float(m.group())
    p = re.compile('(h|d|w|mo|yr)')
    m = p.search(relative_date)
    logger.info(f"m: {m}, type(m): {type(m)}")
    if m is None:
        logger.error(f"Unit not found in {relative_date}")
        exit
    unit = m.group()
    if unit == 'yr':
        amount *= 365*24
    elif unit == 'mo':
        amount *= 30*24
    elif unit == 'w':
        amount *= 7*24
    elif unit == 'd':
        amount *= 24
    logger.info(f" {relative_date} --> Amount in hours: {amount}")
    # Calculate the date from today's, and return it
    howRecent = timedelta(hours=amount)
    todaysDate = datetime.now()
    date = (todaysDate - howRecent)
    return date

# Some tests
rel_dates = ['2h •', '3d •', '1w •']
for rel_date in rel_dates:
    print(f"{rel_date} --> {linkedin_rel_date2datetime(rel_date)}")

In [None]:
def simplify_content(content):
    content = re.sub('\n +', '\n', content)
    content = re.sub('\n+', '\n\n', content)
    content = content.replace("{", "&#123;").replace("}", "&#125;")
    return content
    
def extract_all_from_container(container):
    logger = get_logger(extract_all_from_container.__name__, logging.INFO)
    blog_post = {}
    blog_post["date_human_readable"] = extract_date_string_from_soup(container["soup"])
    blog_post["posted_date"] = linkedin_rel_date2datetime(blog_post["date_human_readable"])
    blog_post["text"] = simplify_content(extract_text_from_soup(container["soup"]))
    blog_post["original_url"] = container["url"]
    logger.info(f"{blog_post['posted_date']} - {blog_post['text'][:30]}")
    return blog_post

def extract_all_from_containers():
    logger = get_logger(extract_all_from_containers.__name__, logging.INFO)
    containers = get_blog_containers()
    blog_posts = []
    
    for container_no, container in enumerate(containers):
        try:
            logger.info(f"Processing container # {container_no}")
            blog_post = extract_all_from_container(container)
            blog_posts.append(blog_post)
        except Exception as e:
            logger.warning(f"Container # {container_no} not added: {str(e)}")
            pass
    return blog_posts

blog_posts = extract_all_from_containers();

In [None]:
if (len(blog_posts) != len(get_blog_containers())):
    print("Not all containers could be transformed to blog_posts! No of conatiner: {len(containers)}, no of blog posts: {len(blog_posts)}")

In [None]:
blog_post_index = 1
print(blog_posts[blog_post_index])
#blog_posts

## Saving blog posts to files

In [None]:
def simplify_text(some_text: str) -> str:
    simplified_text = some_text.replace('"', "'")
    simplified_text = unidecode.unidecode(simplified_text)
    simplified_text = re.sub("[^A-Za-z\-_]+", "_", simplified_text)
    simplified_text = re.sub('_+', '_', simplified_text)
    return simplified_text

In [None]:
def build_title(blog_post):
    LEN_OF_TITLE = 35
    title = blog_post["text"][:LEN_OF_TITLE].replace('\n', ' ')
    return title

def build_title(blog_post):
    LEN_OF_TITLE = 35
    text = blog_post["text"]
    title = text[:LEN_OF_TITLE]
    
    if len(text) > LEN_OF_TITLE and text[LEN_OF_TITLE] != ' ':
        # Extend to the end of the current word
        while len(text) > len(title) and text[len(title)] != ' ':
            title += text[len(title)]
    
    # Replace newlines with spaces in the final title
    title = title.replace('\n', ' ')
    return title

def build_simplified_title(blog_post: Dict) -> str:
    simplified_title = simplify_text(build_title(blog_post))
    return simplified_title

In [None]:
def build_filename(blog_post: Dict) -> str:
    logger = get_logger(build_filename.__name__, logging.INFO)
    LEN_OF_FILENAME = 45
    posted_date = blog_post["posted_date"]
    try:
        posted_date_for_filename = posted_date.strftime(INTERNAL_DATE_FORMAT)
    except:
        createdDateStrForFilename = "_no_date_"    
    simplified_title = build_simplified_title(blog_post)[:LEN_OF_FILENAME-13]
    filename = f"{BLOGS_DIRECTORY}/{posted_date_for_filename}-{simplified_title}.md"
    logger.info(filename)
    return filename

In [None]:
for blog_post in blog_posts:
    print(build_filename(blog_post))

In [None]:
def build_frontmatter(blog_post):
    posted_date = blog_post["posted_date"]
    title = build_title(blog_post)
    original_url = blog_post["original_url"]
    frontMatter = ("---\n"
           "layout: post\n"
           "date: " + transformDate2String(posted_date) + "\n"
           'title: "' + title + '"\n'
           "originalUrl: \"" + original_url + "\"\n")
           #"tags: linkedin " + linkedin_user_based_tags + "\n" +
           #"author: \"" + author + "\"\n")
    frontMatter += "---\n\n"
    return frontMatter

In [None]:
def save_blog_post_to_file(blog_post: Dict) -> None:
    content = blog_post["text"]
    filename = build_filename(blog_post)
    frontmatter = build_frontmatter(blog_post)
    path = os.path.dirname(filename)
    #log("saveToFile", "Saving to file ", filename)
    os.makedirs(path, exist_ok=True)
    with open(filename, 'w') as file:
        file.write(frontmatter)
        file.write(content)
        file.close()

def save_blog_posts_to_file(blog_posts):
    for blog_post in blog_posts:
        save_blog_post_to_file(blog_post)

save_blog_posts_to_file(blog_posts)