# Auto post to Social site
Schedule: 1 Carousel post per day per Page (setup n cron items, each item has 1 category in param)

In [117]:
from PIL import Image, ImageDraw, ImageFont
import pymongo
import os
import requests
import time

In [118]:
from dotenv import load_dotenv
load_dotenv(override=True) 
LI_URI = os.environ['LI_URI']
LI_REST_URI = os.environ['LI_REST_URI']
LI_VERSION = os.environ['LI_VERSION']

In [119]:
db_client = pymongo.MongoClient(os.environ['DB_URI'])
db = db_client['db_certificates']   
tb_advertising_posts = db['tb_advertising_posts']   #store contents to post in social media
tb_cert_metadata = db['tb_cert_metadata']    #meta data of certificates
tb_linkedin_app = db['tb_linkedin_app']    #LI App
tb_linkedin_page = db['tb_linkedin_page']    #LI Page info

In [120]:
def get_timestamp():
    return str(int(time.time()))

def wrap_text(text, font, max_width, draw):
    """
    Splits text into lines that fit within max_width
    """
    words = text.split()
    lines = []
    current_line = ""

    for word in words:
        test_line = current_line + (" " if current_line else "") + word
        w = draw.textlength(test_line, font=font)

        if w <= max_width:
            current_line = test_line
        else:
            lines.append(current_line)
            current_line = word

    if current_line:
        lines.append(current_line)

    return lines

def draw_wrapped_text(draw, position, text, font, max_width, fill=(255,255,255), line_spacing=6):
    x, y = position
    lines = wrap_text(text, font, max_width, draw)

    for line in lines:
        draw.text((x, y), line, font=font, fill=fill)
        y += font.size + line_spacing


In [121]:
#current path of this project
CURRENT_PATH = '/Users/sang/Documents/Source/Python/python_webscrap/cert_exam/'

In [122]:
CTA_LINKS = '\nCheck out the practice questions here to help you pass these certifications on your first try\n'
CTA_LINKS_COMMENT = '\nCheck out the practice questions in the comments to help you pass these certifications on your first try\n'

In [None]:
#input
FONT_PATH = CURRENT_PATH + "font/GothamBlack.ttf"
CAROUSEL_TEMPLATE_PATH_URI = CURRENT_PATH + 'template/carousel_'
IMG_LOGO_PATH_PREFIX = CURRENT_PATH + 'logo/'
#output carousel
CAROUSEL_OUTPUT_PATH_URI = CURRENT_PATH + 'output/'  #the final carousel


In [124]:
#define 5 carousel templates
CAROUSEL_TEMPLATE_POSITION_MAPPING = [
    #carousel 1
    {
        'hook_w': 950,
        'header_font_size': 50,
        'content_font_size': 40,
        'position_mapping': [
            #slide 1: hook
            {
                "header": (60, 285)
            },
            #slide 2: problems
            {
                "header": (115, 390),
                "content": (115, 680)
            },
            #slide 3: insight
            {
                "header": (115, 390),
                "content": (115, 680)
            },
            #slide 4: values
            {
                "header": (115, 390),
                "content": (115, 680)
            },
            #slide 5: logo
            {
                "link": (53, 100),
                "link_w": 980,
                "logo": [(53, 290), (53, 630), (53, 970)], #list of positions of each logo
                "title": [(370, 290), (370, 630), (370, 970)], #list of positions of each logo
                "title_w": 700,
                "salary": [(370, 535), (370, 880), (370, 1225)] #list of positions of each logo
            }
        ]
    }
]

In [125]:
def draw_hook_slide(carousel_template_folder_path, carousel_output_folder_path, hook_text, hook_position, hook_w):
    hook_file_name = '/1.png'
    try:
        base = Image.open(carousel_template_folder_path + hook_file_name).convert("RGBA")
    except Exception as e:
        print('Template image not found')
        return None
    draw = ImageDraw.Draw(base)
    #
    draw_wrapped_text(
            draw=draw,
            position= hook_position,
            text= hook_text,
            font= ImageFont.truetype(FONT_PATH, 100),
            max_width = hook_w,
            fill= (0, 0, 0)     #black color
        )
    #create new folder, if any
    os.makedirs(carousel_output_folder_path, exist_ok=True)

    base.convert("RGB").save(carousel_output_folder_path + hook_file_name, "PNG")
    print(f"Final image saved to: {carousel_output_folder_path}")

In [126]:
def draw_carousel_images(page_url, post_details):
    carousel_template_id = 0    #todo: generate template id from 0 to 4
    carousel_draw_info = CAROUSEL_TEMPLATE_POSITION_MAPPING[carousel_template_id]
    carousel_template_folder_path = CAROUSEL_TEMPLATE_PATH_URI + str(carousel_template_id)
    position_mapping = carousel_draw_info['position_mapping']   #position of all slides
    carousel_output_folder_path = CAROUSEL_OUTPUT_PATH_URI + str(post_details["_id"])
    #1. draw hook
    draw_hook_slide(carousel_template_folder_path, carousel_output_folder_path, post_details['hook'], position_mapping[0]['header'], carousel_draw_info['hook_w'])

In [127]:
#get upload link from LI for the page (can use this info for many images)
def get_LI_upload_link(owner_id, li_headers):
    url = LI_URI + 'v2/assets?action=registerUpload'
    payload = {
        "registerUploadRequest": {
            "recipes": [
                "urn:li:digitalmediaRecipe:feedshare-image"
            ],
            "owner": owner_id,
            "serviceRelationships": [
                {
                    "relationshipType": "OWNER",
                    "identifier": "urn:li:userGeneratedContent"
                }
            ]
        }
    }
    try:
        detail = requests.post(url, json=payload, headers=li_headers)
        return detail.json()
    except Exception as e:
        print('get_LI_upload_link', e)
        return {'error': e}

In [128]:
#output: 5 images of carousel
def create_certifications_image(
    page_link:str,
    cert_symbols: list,
    cert_names: list,
    salary_range: str,
    output_path: str
):
    base = Image.open(IMG_TEMPLATE_PATH).convert("RGBA")
    draw = ImageDraw.Draw(base)
    index = 0
    # ---- Add logo images ----
    for symbol in cert_symbols:
        overlay_img = Image.open(IMG_LOGO_PATH_PREFIX + symbol + '.png').convert("RGBA")
        #don't need to resize because logos are saved with size 330x330
        # if "size" in item and item["size"]:
        #     overlay_img = overlay_img.resize(item["size"], Image.ANTIALIAS)
        base.paste(overlay_img, LOGO_POSITIONS[index], overlay_img)
        index = index + 1

    # ---- Add certification names ----
    index = 0
    for t in cert_names:
        font = ImageFont.truetype(FONT_PATH, 48)
        draw_wrapped_text(
            draw=draw,
            position= NAME_POSITIONS[index],
            text= t,
            font= font,
            max_width= 620,
            fill= (0, 0, 153),   #certification name color
            line_spacing=8
        )
        index = index + 1
    #draw salary range
    draw_wrapped_text(
            draw=draw,
            position= SALARY_POSITION,
            text= 'Salary range: ' + salary_range,
            font= ImageFont.truetype(FONT_PATH, 60),
            max_width= 800,
            fill= (153, 153, 0)   #text color
        )
    #draw footer
    draw_wrapped_text(
            draw=draw,
            position= FOOTER_URL_POSITION,
            text= page_link,
            font= ImageFont.truetype(FONT_PATH, 20),
            max_width= 620,
            fill= (0, 0, 255)
        )

    base.convert("RGB").save(output_path, "PNG")
    print(f"Final image saved to: {output_path}")


In [129]:
#share new post with 1 new uploaded image
def share_my_img_2_LI(asset, owner_id, access_token, post_content):
    #print('asset: ' + asset)
    payload = {
        "author": owner_id,
        "lifecycleState": "PUBLISHED",
        "specificContent": {
            "com.linkedin.ugc.ShareContent": {
                "shareCommentary": {
                    "text": post_content
                },
                "shareMediaCategory": "IMAGE",
                "media": [
                    {
                        "status": "READY",
                        "media": asset
                    }
                ]
            }
        },
        "visibility": {
            "com.linkedin.ugc.MemberNetworkVisibility": "PUBLIC"
        }
    }
    li_headers = {
        'Authorization': access_token,
        'LinkedIn-Version': LI_VERSION,
        'Content-Type': 'application/json'
    }
    try:
        response = requests.post(LI_URI + 'v2/ugcPosts', json=payload, headers=li_headers)
        # print('====== Share Post to Page results:')
        # print("Status Code:", response.status_code)
        # print("Headers:", response.headers)
        # print("Response Body:", response.text)
        
        if response.status_code >= 200 and response.status_code < 300:
            print("The image was shared successfully!")
            return 'ok'
        else:
            print("The image was shared failed.")
            return 'failed'
    except Exception as e:
        print(e)   
        return 'failed'

In [130]:
#upload image to LI
def upload_img_2_LI_and_share(li_access_token, file_path, upload_detail, owner_id, post_content):
    uploadUrl = upload_detail['uploadMechanism']['com.linkedin.digitalmedia.uploading.MediaUploadHttpRequest']['uploadUrl']
    #upload the image https://learn.microsoft.com/en-us/linkedin/consumer/integrations/self-serve/share-on-linkedin#upload-image-or-video-binary-file
    try:
        with open(file_path, 'rb') as file:
            files = {'file': (file.name, file, 'image/jpg')}
            headers = {
                'Authorization': li_access_token,
                'LinkedIn-Version': LI_VERSION
            }

            response = requests.post(uploadUrl, files=files, headers=headers)
            # print('====== Upload image results:')
            # print("Status Code:", response.status_code)
            # print("Headers:", response.headers)
            # print("Response Body:", response.text)

            if response.status_code >= 200 and response.status_code < 300:
                print("File uploaded successfully!")
                time.sleep(5)   #delay 5 seconds for image going through LI system
                result = share_my_img_2_LI(upload_detail['asset'], owner_id, li_access_token, post_content)
                return result
            else:
                print("File upload failed.")
                return 'failed'

    except FileNotFoundError:
        print(f"Error: File not found at path: {file_path}")
        return 'failed' 
    except requests.exceptions.RequestException as e:
        print(f"Error during upload: {e}")
        return 'failed'
    

In [131]:
def find_app_and_page_info(_category):
    #find app token and page linking to this category
    page_info = tb_linkedin_page.find_one({'category': _category})
    if page_info is None:
        print('There is no page info for category: ' + _category)
        return None
    #find app info
    app_info = tb_linkedin_app.find_one({'app_id': page_info['app_id']})
    if app_info is None:
        print('There is no app info for category: ' + _category)
        return None
    return app_info, page_info

In [132]:
def create_carousel_images(_category):
    post_details = tb_advertising_posts.find_one({'posted_li_img':0, 'category': _category})
    print(type(post_details), post_details)

    if post_details is None:
        print('There is no post for Image content of category: ' + _category)
        return None
    if 'symbols' not in post_details:
        print('There is no symbol for Image content of category: ' + _category)
        return
    symbols = post_details['symbols'].split(',')
    #find name of certifications
    cert_names = []
    cert_links = []
    for symbol in symbols:
        cert_details = tb_cert_metadata.find_one({'symbol':symbol})
        if cert_details is None:
            print('There is no detail for the certification: ' + symbol)
            return None
        cert_names.append(cert_details['name'])
        cert_links.append(cert_details['udemy_link'])
    app_info, page_info = find_app_and_page_info(_category)
    if app_info is None:
        return None
    #create final carousels folder
    draw_carousel_images(page_info['link'], post_details)
    return None

#test
create_carousel_images('AI')

<class 'dict'> {'_id': ObjectId('6975c98da8cb39cb96c9f820'), 'category': 'AI', 'hook': 'From curious about AI -> trusted AI practitioner in months.', 'problems': 'Many professionals get stuck watching videos but never feel ready for real AI roles.', 'insight': 'Certifications create structure. They force you to think like an engineer, not a hobbyist.', 'value': 'AWS and Microsoft AI certs train you to design, deploy, and evaluate real AI solutions at scale.', 'roles': 'AI Engineer, Cloud AI Specialist, Machine Learning Associate', 'salary': '$85,000 - $150,000', 'symbols': 'AWS_AIF,AZ_AI_900', 'posted_li_img': 0}
Final image saved to: /Users/sang/Documents/Source/Python/python_webscrap/cert_exam/output/6975c98da8cb39cb96c9f820


In [133]:
#get 1 random post, create carousel images, and share to LinkedIn
# def post_1_carousel_by_category_2_LI(_category):
    
    #create CTA link
    # cta_links = []
    # index = 1
    # for cert_name in cert_names:
    #     cta_links.append(str(index) + '. ' + cert_name + ': ' + cert_links[index-1] + '\n')
    #     index = index + 1
    # cta_links_text = CTA_LINKS + ''.join(cta_links)
    # post_content = post_details['content'] + cta_links_text
    # #upload image to to Linkedin Page
    # li_access_token = 'Bearer ' + app_info['access_token']
    # li_headers = {
    #     'Authorization': li_access_token,
    #     'LinkedIn-Version': LI_VERSION,
    #     'Content-Type': 'application/json'
    # }
    # owner_id = "urn:li:organization:" + page_info['page_id']    #my page
    # upload_link = get_LI_upload_link(owner_id, li_headers)
    # result_upload = upload_img_2_LI_and_share(li_access_token, new_img_path, upload_link['value'], owner_id, post_content)
    # if result_upload == 'ok':
    #     #the post is successfully up
    #     #delete that image
    #     if os.path.exists(new_img_path):
    #         os.remove(new_img_path)
    #     print('==== The image is shared in LI Page for category: ' + _category)
    #     #add flag to that post to indicate done
    #     post_details['posted_li_img'] = 1
    #     tb_advertising_posts.replace_one({"_id": post_details['_id']}, post_details)
    #     return
    # else:
    #     print('There is an error for posting in LI page for category: ' + _category)
    #     return
#test


In [134]:
# if __name__ == '__main__':
#     args = sys.argv
#     print(args)
#     if len(args) > 1:
#         category = args[1]   #category of certifications
#         print(category)
#         post_1_image_by_category(category)

In [135]:
db_client.close()